summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-10-17 09:56:08 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-10-17 09:56:08 +0000
commitd0c01828bf846d555ba648c342e58f2aa71b6cc7 (patch)
tree134f0aaa67d6e6d678970db09d222aa6edcf64d6 /storage
parent4d60d2845dba1fcfb2b68bd64a616d50578bf88c (diff)
Make setActiveState async.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h7
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.cpp32
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.h1
7 files changed, 52 insertions, 41 deletions
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4a28e650fac..d2bfabd2950 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -3,11 +3,16 @@
#include "asynchandler.h"
#include "persistenceutil.h"
#include "testandsethelper.h"
+#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.persistence.asynchandler");
+
namespace storage {
namespace {
@@ -88,10 +93,12 @@ private:
}
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
+ BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
const document::BucketIdFactory & bucketIdFactory)
: _env(env),
_spi(spi),
+ _bucketOwnershipNotifier(bucketOwnershipNotifier),
_sequencedExecutor(executor),
_bucketIdFactory(bucketIdFactory)
{}
@@ -135,6 +142,39 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ trackerUP->setMetric(_env._metrics.setBucketStates);
+
+ //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
+ spi::Bucket bucket(cmd.getBucket());
+ bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
+
+ auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
+ notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
+ if (tracker->checkForError(*response)) {
+ StorBucketDatabase::WrappedEntry
+ entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(),
+ "handleSetBucketState");
+ if (entry.exist()) {
+ entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
+ notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
+ entry.write();
+ } else {
+ LOG(warning, "Got OK setCurrentState result from provider for %s, "
+ "but bucket has disappeared from service layer database",
+ cmd.getBucketId().toString().c_str());
+ }
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
+ }
+ tracker->sendReply();
+ });
+ _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+MessageTracker::UP
AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 23f3605dca1..bf37becb2c3 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -14,6 +14,7 @@ namespace spi {
class Context;
}
class PersistenceUtil;
+class BucketOwnershipNotifier;
/**
* Handle async operations that uses a sequenced executor.
@@ -21,12 +22,13 @@ class PersistenceUtil;
*/
class AsyncHandler : public Types {
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor,
- const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
+ vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
@@ -34,6 +36,7 @@ private:
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
+ BucketOwnershipNotifier & _bucketOwnershipNotifier;
vespalib::ISequencedTaskExecutor & _sequencedExecutor;
const document::BucketIdFactory & _bucketIdFactory;
};
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 5315d3ec0bc..3d9b359f506 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -20,7 +20,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_mergeHandler(_env, provider, component.cluster_context(), _clock,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
+ _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
{
@@ -62,7 +62,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::APPLYBUCKETDIFF_ID:
return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 471d3d62a35..ab5066576fd 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste
return checkResult(_impl.setClusterState(bucketSpace, state));
}
-spi::Result
-ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState)
+void
+ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.setActiveState(bucket, newState));
+ onComplete->addResultHandler(this);
+ _impl.setActiveStateAsync(bucket, newState, std::move(onComplete));
}
spi::BucketInfoResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index d23cce9172a..9361cd1d19d 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,7 +41,7 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
index 0856f45c3ff..d5b44cc1911 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "splitbitdetector.h"
#include "messages.h"
-#include <vespa/storage/common/bucketmessages.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storageapi/message/bucket.h>
@@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker
}
MessageTracker::UP
-SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.setBucketStates);
- NotificationGuard notifyGuard(_bucketOwnershipNotifier);
-
- LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- spi::Bucket bucket(cmd.getBucket());
- bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
-
- spi::Result result(_spi.setActiveState(bucket, newState));
- if (tracker->checkForError(result)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
- if (entry.exist()) {
- entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
- entry.write();
- } else {
- LOG(warning, "Got OK setCurrentState result from provider for %s, "
- "but bucket has disappeared from service layer database",
- cmd.getBucketId().toString().c_str());
- }
-
- tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.recheckBucketInfo);
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h
index ddfa22b154c..4521e520ee9 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.h
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h
@@ -21,7 +21,6 @@ public:
SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &,
BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization);
MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const;
private: