summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp14
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h2
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp5
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h2
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp8
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h3
-rw-r--r--searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp5
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp4
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp19
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/buckethandler.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h7
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.cpp32
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.h1
23 files changed, 101 insertions, 76 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 66f03edafa2..ff9676da7d0 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -405,9 +405,8 @@ DummyPersistence::setClusterState(BucketSpace bucketSpace, const ClusterState& c
return Result();
}
-Result
-DummyPersistence::setActiveState(const Bucket& b,
- BucketInfo::ActiveState newState)
+void
+DummyPersistence::setActiveStateAsync(const Bucket& b, BucketInfo::ActiveState newState, OperationComplete::UP onComplete)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
LOG(debug, "setCurrentState(%s, %s)",
@@ -416,11 +415,12 @@ DummyPersistence::setActiveState(const Bucket& b,
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
BucketContentGuard::UP bc(acquireBucketWithLock(b));
- if (!bc.get()) {
- return BucketInfoResult(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found");
+ if ( ! bc ) {
+ onComplete->onComplete(std::make_unique<BucketInfoResult>(Result::ErrorType::TRANSIENT_ERROR, "Bucket not found"));
+ } else {
+ (*bc)->setActive(newState == BucketInfo::ACTIVE);
+ onComplete->onComplete(std::make_unique<Result>());
}
- (*bc)->setActive(newState == BucketInfo::ACTIVE);
- return Result();
}
BucketInfoResult
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 486f4cec2f2..9d93316d382 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -155,7 +155,7 @@ public:
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override;
- Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
+ void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index 23a8f600024..e423e0aaac5 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -30,4 +30,9 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const
return BucketIdListResult(list);
}
+void
+AbstractPersistenceProvider::setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP op) {
+ op->onComplete(std::make_unique<Result>());
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 2332c05b57f..5f8cf2fc171 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -43,7 +43,7 @@ public:
/**
* Default impl empty.
*/
- Result setActiveState(const Bucket&, BucketInfo::ActiveState) override { return Result(); }
+ void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) override;
/**
* Default impl empty.
*/
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 575a95269c5..bb819fc9e50 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -9,6 +9,14 @@ namespace storage::spi {
PersistenceProvider::~PersistenceProvider() = default;
Result
+PersistenceProvider::setActiveState(const Bucket& bucket, BucketInfo::ActiveState activeState) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ setActiveStateAsync(bucket, activeState, std::move(catcher));
+ return *future.get();
+}
+
+Result
PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 56ef21b5c77..45ca49435a7 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -86,7 +86,8 @@ struct PersistenceProvider
* other buckets may be deactivated, so the node must be able to serve
* the data from its secondary index or get reduced coverage.
*/
- virtual Result setActiveState(const Bucket&, BucketInfo::ActiveState) = 0;
+ Result setActiveState(const Bucket&, BucketInfo::ActiveState);
+ virtual void setActiveStateAsync(const Bucket &, BucketInfo::ActiveState, OperationComplete::UP ) = 0;
/**
* Retrieve metadata for a bucket, previously returned in listBuckets(),
diff --git a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp
index 487c8741a65..29748a2010c 100644
--- a/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/buckethandler/buckethandler_test.cpp
@@ -96,7 +96,7 @@ struct Fixture
BucketStateCalculator::SP _calc;
test::BucketIdListResultHandler _bucketList;
test::BucketInfoResultHandler _bucketInfo;
- test::GenericResultHandler _genResult;
+ std::shared_ptr<test::GenericResultHandler> _genResult;
Fixture()
: _builder(),
_bucketDB(std::make_shared<bucketdb::BucketDBOwner>()),
@@ -107,7 +107,8 @@ struct Fixture
_handler(_exec),
_changedHandler(),
_calc(new BucketStateCalculator()),
- _bucketList(), _bucketInfo(), _genResult()
+ _bucketList(), _bucketInfo(),
+ _genResult(std::make_shared<test::GenericResultHandler>())
{
// bucket 2 & 3 & 4 & 7 in ready
_ready.insertDocs(_builder.createDocs(2, 1, 3). // 2 docs
diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
index 2bb1eb44e25..e8cc1b54235 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp
@@ -20,7 +20,7 @@ struct DummyPersistenceHandler : public IPersistenceHandler {
void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {}
void handleListBuckets(IBucketIdListResultHandler &) override {}
void handleSetClusterState(const storage::spi::ClusterState &, IGenericResultHandler &) override {}
- void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, IGenericResultHandler &) override {}
+ void handleSetActiveState(const storage::spi::Bucket &, storage::spi::BucketInfo::ActiveState, std::shared_ptr<IGenericResultHandler>) override {}
void handleGetBucketInfo(const storage::spi::Bucket &, IBucketInfoResultHandler &) override {}
void handleCreateBucket(FeedToken, const storage::spi::Bucket &) override {}
void handleDeleteBucket(FeedToken, const storage::spi::Bucket &) override {}
@@ -44,8 +44,6 @@ DummyPersistenceHandler::SP handler_b(std::make_shared<DummyPersistenceHandler>(
DummyPersistenceHandler::SP handler_c(std::make_shared<DummyPersistenceHandler>());
DummyPersistenceHandler::SP handler_a_new(std::make_shared<DummyPersistenceHandler>());
-
-
void
assertHandler(const IPersistenceHandler::SP & lhs, const IPersistenceHandler * rhs)
{
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 9613c505f77..c252c89a2f8 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -227,10 +227,10 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer {
}
void handleSetActiveState(const Bucket &bucket, storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler) override {
+ std::shared_ptr<IGenericResultHandler> resultHandler) override {
lastBucket = bucket;
lastBucketState = newState;
- resultHandler.handle(bucketStateResult);
+ resultHandler->handle(bucketStateResult);
}
void handleGetBucketInfo(const Bucket &, IBucketInfoResultHandler &resultHandler) override {
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
index b4544868bbe..b393a85f632 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h
@@ -53,7 +53,7 @@ public:
virtual void handleSetActiveState(const storage::spi::Bucket &bucket,
storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler) = 0;
+ std::shared_ptr<IGenericResultHandler> resultHandler) = 0;
virtual void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) = 0;
virtual void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) = 0;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 136d95a068b..cb26e80b3ff 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -306,20 +306,21 @@ PersistenceEngine::setClusterState(BucketSpace bucketSpace, const ClusterState &
}
-Result
-PersistenceEngine::setActiveState(const Bucket& bucket,
- storage::spi::BucketInfo::ActiveState newState)
+void
+PersistenceEngine::setActiveStateAsync(const Bucket & bucket, BucketInfo::ActiveState newState, OperationComplete::UP onComplete)
{
ReadGuard rguard(_rwMutex);
HandlerSnapshot snap = getHandlerSnapshot(rguard, bucket.getBucketSpace());
- auto catchResult = std::make_unique<storage::spi::CatchResult>();
- auto futureResult = catchResult->future_result();
- GenericResultHandler resultHandler(snap.size(), std::move(catchResult));
- for (; snap.handlers().valid(); snap.handlers().next()) {
+ auto resultHandler = std::make_shared<GenericResultHandler>(snap.size(), std::move(onComplete));
+ while (snap.handlers().valid()) {
IPersistenceHandler *handler = snap.handlers().get();
- handler->handleSetActiveState(bucket, newState, resultHandler);
+ snap.handlers().next();
+ if (snap.handlers().valid()) {
+ handler->handleSetActiveState(bucket, newState, resultHandler);
+ } else {
+ handler->handleSetActiveState(bucket, newState, std::move(resultHandler));
+ }
}
- return *futureResult.get();
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 0aeb3e16351..e131cb13ae1 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -103,7 +103,7 @@ public:
Result initialize() override;
BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override;
- Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override;
+ void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override;
diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp
index d6602e18c81..c15be9336fe 100644
--- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.cpp
@@ -98,10 +98,10 @@ BucketHandler::handleListBuckets(IBucketIdListResultHandler &resultHandler)
void
BucketHandler::handleSetCurrentState(const BucketId &bucketId,
storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler)
+ std::shared_ptr<IGenericResultHandler> resultHandlerSP)
{
- _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandlerP = &resultHandler]() {
- performSetCurrentState(bucketId, newState, resultHandlerP);
+ _executor.execute(makeLambdaTask([this, bucketId, newState, resultHandler = std::move(resultHandlerSP)]() {
+ performSetCurrentState(bucketId, newState, resultHandler.get());
}));
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h
index 927744e1b8e..7f44d2ebd71 100644
--- a/searchcore/src/vespa/searchcore/proton/server/buckethandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/buckethandler.h
@@ -55,7 +55,7 @@ public:
void handleListBuckets(IBucketIdListResultHandler &resultHandler);
void handleSetCurrentState(const document::BucketId &bucketId,
storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler);
+ std::shared_ptr<IGenericResultHandler> resultHandler);
void handleGetBucketInfo(const storage::spi::Bucket &bucket,
IBucketInfoResultHandler &resultHandler);
void handleListActiveBuckets(IBucketIdListResultHandler &resultHandler);
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
index 3d464cced5b..bec9197501b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp
@@ -69,9 +69,9 @@ PersistenceHandlerProxy::handleSetClusterState(const storage::spi::ClusterState
void
PersistenceHandlerProxy::handleSetActiveState(const storage::spi::Bucket &bucket,
storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler)
+ std::shared_ptr<IGenericResultHandler> resultHandler)
{
- _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, resultHandler);
+ _bucketHandler.handleSetCurrentState(bucket.getBucketId().stripUnused(), newState, std::move(resultHandler));
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
index 96bfbe18423..f4d6175391c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h
@@ -40,7 +40,7 @@ public:
void handleSetClusterState(const storage::spi::ClusterState &calc, IGenericResultHandler &resultHandler) override;
void handleSetActiveState(const storage::spi::Bucket &bucket, storage::spi::BucketInfo::ActiveState newState,
- IGenericResultHandler &resultHandler) override;
+ std::shared_ptr<IGenericResultHandler> resultHandler) override;
void handleGetBucketInfo(const storage::spi::Bucket &bucket, IBucketInfoResultHandler &resultHandler) override;
void handleCreateBucket(FeedToken token, const storage::spi::Bucket &bucket) override;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4a28e650fac..d2bfabd2950 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -3,11 +3,16 @@
#include "asynchandler.h"
#include "persistenceutil.h"
#include "testandsethelper.h"
+#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.persistence.asynchandler");
+
namespace storage {
namespace {
@@ -88,10 +93,12 @@ private:
}
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
+ BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
const document::BucketIdFactory & bucketIdFactory)
: _env(env),
_spi(spi),
+ _bucketOwnershipNotifier(bucketOwnershipNotifier),
_sequencedExecutor(executor),
_bucketIdFactory(bucketIdFactory)
{}
@@ -135,6 +142,39 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ trackerUP->setMetric(_env._metrics.setBucketStates);
+
+ //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
+ spi::Bucket bucket(cmd.getBucket());
+ bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
+
+ auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
+ notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
+ if (tracker->checkForError(*response)) {
+ StorBucketDatabase::WrappedEntry
+ entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(bucket.getBucketId(),
+ "handleSetBucketState");
+ if (entry.exist()) {
+ entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
+ notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
+ entry.write();
+ } else {
+ LOG(warning, "Got OK setCurrentState result from provider for %s, "
+ "but bucket has disappeared from service layer database",
+ cmd.getBucketId().toString().c_str());
+ }
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
+ }
+ tracker->sendReply();
+ });
+ _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+MessageTracker::UP
AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 23f3605dca1..bf37becb2c3 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -14,6 +14,7 @@ namespace spi {
class Context;
}
class PersistenceUtil;
+class BucketOwnershipNotifier;
/**
* Handle async operations that uses a sequenced executor.
@@ -21,12 +22,13 @@ class PersistenceUtil;
*/
class AsyncHandler : public Types {
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor,
- const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
+ vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
@@ -34,6 +36,7 @@ private:
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
+ BucketOwnershipNotifier & _bucketOwnershipNotifier;
vespalib::ISequencedTaskExecutor & _sequencedExecutor;
const document::BucketIdFactory & _bucketIdFactory;
};
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 5315d3ec0bc..3d9b359f506 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -20,7 +20,7 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_mergeHandler(_env, provider, component.cluster_context(), _clock,
cfg.bucketMergeChunkSize,
cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
+ _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
{
@@ -62,7 +62,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::APPLYBUCKETDIFF_ID:
return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 471d3d62a35..ab5066576fd 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste
return checkResult(_impl.setClusterState(bucketSpace, state));
}
-spi::Result
-ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState)
+void
+ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.setActiveState(bucket, newState));
+ onComplete->addResultHandler(this);
+ _impl.setActiveStateAsync(bucket, newState, std::move(onComplete));
}
spi::BucketInfoResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index d23cce9172a..9361cd1d19d 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,7 +41,7 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
index 0856f45c3ff..d5b44cc1911 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "splitbitdetector.h"
#include "messages.h"
-#include <vespa/storage/common/bucketmessages.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storageapi/message/bucket.h>
@@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker
}
MessageTracker::UP
-SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.setBucketStates);
- NotificationGuard notifyGuard(_bucketOwnershipNotifier);
-
- LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- spi::Bucket bucket(cmd.getBucket());
- bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
-
- spi::Result result(_spi.setActiveState(bucket, newState));
- if (tracker->checkForError(result)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
- if (entry.exist()) {
- entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
- entry.write();
- } else {
- LOG(warning, "Got OK setCurrentState result from provider for %s, "
- "but bucket has disappeared from service layer database",
- cmd.getBucketId().toString().c_str());
- }
-
- tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.recheckBucketInfo);
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h
index ddfa22b154c..4521e520ee9 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.h
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h
@@ -21,7 +21,6 @@ public:
SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &,
BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization);
MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const;
private: