aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/vespa
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/vespa')
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemetricsset.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp119
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h9
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp63
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp18
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h9
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp74
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.cpp32
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.h1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h3
29 files changed, 225 insertions, 213 deletions
diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
index d3096e6864e..618e49c4238 100644
--- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
@@ -39,13 +39,13 @@ MergeBucketMetricSet::MergeBucketMetricSet(const std::string& name, metrics::Met
: OperationMetricSet(name, std::move(tags), description, owner),
source_only_copy_changed("source_only_copy_changed",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where source-only copy changed"),
+ "The number of merge operations where source-only copy changed", this),
source_only_copy_delete_blocked("source_only_copy_delete_blocked",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where delete of unchanged source-only copies was blocked"),
+ "The number of merge operations where delete of unchanged source-only copies was blocked", this),
source_only_copy_delete_failed("source_only_copy_delete_failed",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where delete of unchanged source-only copies failed")
+ "The number of merge operations where delete of unchanged source-only copies failed", this)
{
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index fbe1c142b09..7f66d1effd5 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() {
}
bool
-GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const {
+GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const {
return true;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 2e010a61bde..f51739242b7 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -21,7 +21,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index b1231fafcd9..744b24b593e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -138,8 +138,7 @@ public:
bool check(uint32_t messageType, uint16_t node, uint8_t priority) override
{
- (void) node;
- if (op.shouldBlockThisOperation(messageType, priority)) {
+ if (op.shouldBlockThisOperation(messageType, node, priority)) {
blocked = true;
return false;
}
@@ -232,6 +231,7 @@ IdealStateOperation::toString() const
bool
IdealStateOperation::shouldBlockThisOperation(uint32_t messageType,
+ [[maybe_unused]] uint16_t node,
uint8_t) const
{
for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index d4dc4e405df..f8f35afe821 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -217,7 +217,7 @@ public:
/**
* Should return true if the given message type should block this operation.
*/
- virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const;
+ virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const;
protected:
friend struct IdealStateManagerTest;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 7cfe4172b2c..f951a880e5d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{
}
-bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const {
+bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const {
for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) {
if (messageType == blocking_type) {
return true;
}
}
- return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
+ return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri);
}
bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 1bca1f7389f..832c0f99681 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -48,7 +48,7 @@ public:
const document::BucketId&, MergeLimiter&,
std::vector<MergeMetaData>&);
- bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
+ bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override;
bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
private:
static void addIdealNodes(
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 9a57722dc7e..25cae5b9979 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
bool
RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg)
{
- api::DeleteBucketReply* rep =
- dynamic_cast<api::DeleteBucketReply*>(msg.get());
+ auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
uint16_t node = _tracker.handleReply(*rep);
@@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha
}
bool
-RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const
+RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const
{
- return true;
+ // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded
+ // number in the worst case), so a simple linear scan suffices.
+ for (uint16_t node : getNodes()) {
+ if (target_node == node) {
+ return true;
+ }
+ }
+ return false;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
index a0d496f948a..5e0922d5685 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
@@ -30,7 +30,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "remove"; };
Type getType() const override { return DELETE_BUCKET; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 649503cf0f5..6f3924535ef 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op
bool
SplitOperation::shouldBlockThisOperation(uint32_t msgType,
+ [[maybe_unused]] uint16_t node,
uint8_t pri) const
{
if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index ee957309088..6a268155fc8 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -21,7 +21,7 @@ public:
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override;
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4a28e650fac..d150f5600e5 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -3,11 +3,16 @@
#include "asynchandler.h"
#include "persistenceutil.h"
#include "testandsethelper.h"
+#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.persistence.asynchandler");
+
namespace storage {
namespace {
@@ -86,12 +91,26 @@ private:
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
};
+bool
+bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
+ // Don't check document sizes, as background moving of documents in Proton
+ // may trigger a change in size without any mutations taking place. This will
+ // only take place when a document being moved was fed _prior_ to the change
+ // where Proton starts reporting actual document sizes, and will eventually
+ // converge to a stable value. But for now, ignore it to prevent false positive
+ // error logs and non-deleted buckets.
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
}
+
+}
+
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
+ BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
const document::BucketIdFactory & bucketIdFactory)
: _env(env),
_spi(spi),
+ _bucketOwnershipNotifier(bucketOwnershipNotifier),
_sequencedExecutor(executor),
_bucketIdFactory(bucketIdFactory)
{}
@@ -135,6 +154,79 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exist() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return tracker;
+}
+
+MessageTracker::UP
+AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ trackerUP->setMetric(_env._metrics.setBucketStates);
+
+ //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
+ spi::Bucket bucket(cmd.getBucket());
+ bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
+
+ auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
+ notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
+ if (tracker->checkForError(*response)) {
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState");
+ if (entry.exist()) {
+ entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
+ notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
+ entry.write();
+ } else {
+ LOG(warning, "Got OK setCurrentState result from provider for %s, "
+ "but bucket has disappeared from service layer database",
+ cmd.getBucketId().toString().c_str());
+ }
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
+ }
+ tracker->sendReply();
+ });
+ _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+MessageTracker::UP
AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
@@ -233,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra
return true;
}
+bool
+AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
+{
+ spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
+ if (result.hasError()) {
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
+ return false;
+ }
+ api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
+ // Don't check meta fields or active/ready fields since these are not
+ // that important and ready may change under the hood in a race with
+ // getModifiedBuckets(). If bucket is empty it means it has already
+ // been deleted by a racing split/join.
+ if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
+ LOG(error,
+ "Service layer bucket database and provider out of sync before "
+ "deleting bucket %s! Service layer db had %s while provider says "
+ "bucket has %s. Deletion has been rejected to ensure data is not "
+ "lost, but bucket may remain out of sync until service has been "
+ "restarted.",
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
+ return false;
+ }
+ return true;
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 23f3605dca1..4f5c242570c 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -14,6 +14,7 @@ namespace spi {
class Context;
}
class PersistenceUtil;
+class BucketOwnershipNotifier;
/**
* Handle async operations that uses a sequenced executor.
@@ -21,19 +22,23 @@ class PersistenceUtil;
*/
class AsyncHandler : public Types {
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor,
- const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
+ vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
+ BucketOwnershipNotifier & _bucketOwnershipNotifier;
vespalib::ISequencedTaskExecutor & _sequencedExecutor;
const document::BucketIdFactory & _bucketIdFactory;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 0dcb8539bff..70ed9845cb0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -200,13 +200,13 @@ public:
virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0;
/**
- * Returns the reference to the current merge status for the given bucket.
+ * Returns a shared pointer to the current merge status for the given bucket.
* This allows unlocked access to an internal variable, so users should
* first check that noone else is using it by calling isMerging() first.
*
* @param bucket The bucket to start merging.
*/
- virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0;
+ virtual std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket& bucket) = 0;
/**
* Returns true if the bucket is currently being merged on this node.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 62f9fa12a21..e395a7df9e0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_
_mergeStates[bucket] = status;
}
-MergeStatus&
+std::shared_ptr<MergeStatus>
FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
std::lock_guard mlock(_mergeStatesLock);
@@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
if ( ! status ) {
throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
- return *status;
+ return status;
}
bool
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index a3dc316cdde..5f212b18a7f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -218,7 +218,7 @@ public:
}
void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override;
- MergeStatus& editMergeStatus(const document::Bucket&) override;
+ std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket&) override;
bool isMerging(const document::Bucket&) const override;
void clearMergeStatus(const document::Bucket& bucket) override;
void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index ddcab0f3659..2ffb827accf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -10,7 +10,6 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/messagebucket.h>
-#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/persistencehandler.h>
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7693156ae30..963fddd9fb5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -23,13 +23,15 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize)
+ uint32_t commonMergeChainOptimalizationMinimumSize,
+ bool async_apply_bucket_diff)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
_spi(spi),
_maxChunkSize(maxChunkSize),
- _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize)
+ _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
+ _async_apply_bucket_diff(async_apply_bucket_diff)
{
}
@@ -1136,49 +1138,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
api::StorageReply::SP replyToSend;
bool clearState = true;
try {
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
if (reply.getResult().failed()) {
// We failed, so we should reply to the pending message.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
// If we didn't fail, reply should have good content
// Sanity check for nodes
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s.diff.size() == 0);
- s.diff.insert(s.diff.end(),
+ assert(s->diff.size() == 0);
+ s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
clearState = false;
} else {
_env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(
- s.startTime.getElapsedTimeAsDouble());
+ s->startTime.getElapsedTimeAsDouble());
}
}
} else {
// Exists in send on list, send on!
- replyToSend = s.pendingGetDiff;
+ replyToSend = s->pendingGetDiff;
LOG(spam, "Received GetBucketDiffReply for %s with diff of "
"size %zu. Sending it on.",
bucket.toString().c_str(), reply.getDiff().size());
- s.pendingGetDiff->getDiff().swap(reply.getDiff());
+ s->pendingGetDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
@@ -1282,8 +1284,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
{
+ (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
ApplyBucketDiffState async_results(*this, bucket);
@@ -1296,11 +1299,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
bool clearState = true;
@@ -1315,12 +1318,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex);
if (applyDiffNeedLocalData(diff, index, false)) {
framework::MilliSecTimer startTime(_clock);
- fetchLocalData(bucket, diff, index, s.context);
+ fetchLocalData(bucket, diff, index, s->context);
_env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- applyDiffLocally(bucket, diff, index, s.context, async_results);
+ applyDiffLocally(bucket, diff, index, s->context, async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
async_results.check();
async_results.sync_bucket_info();
@@ -1332,50 +1335,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
uint16_t hasMask = 0;
for (uint16_t i=0; i<reply.getNodes().size(); ++i) {
hasMask |= (1 << i);
}
- const size_t diffSizeBefore = s.diff.size();
- const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes());
+ const size_t diffSizeBefore = s->diff.size();
+ const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes());
if (reply.getResult().success()
- && s.diff.size() == diffSizeBefore
+ && s->diff.size() == diffSizeBefore
&& !altered)
{
std::string msg(
vespalib::make_string(
"Completed merge cycle without fixing "
"any entries (merge state diff at %zu entries)",
- s.diff.size()));
+ s->diff.size()));
returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg);
LOG(warning,
"Got reply indicating merge cycle did not fix any entries: %s",
reply.toString(true).c_str());
LOG(warning,
"Merge state for which there was no progress across a full merge cycle: %s",
- s.toString().c_str());
+ s->toString().c_str());
}
if (returnCode.failed()) {
// Should reply now, since we failed.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble());
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble());
}
}
} else {
- replyToSend = s.pendingApplyDiff;
+ replyToSend = s->pendingApplyDiff;
LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.",
bucket.toString().c_str());
- s.pendingApplyDiff->getDiff().swap(reply.getDiff());
+ s->pendingApplyDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index fa7e21dae78..0ff8f3c0ef8 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -45,7 +45,8 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64);
+ uint32_t commonMergeChainOptimalizationMinimumSize = 64,
+ bool async_apply_bucket_diff = false);
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -68,7 +69,7 @@ public:
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
- void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const;
+ void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
private:
const framework::Clock &_clock;
@@ -77,6 +78,7 @@ private:
spi::PersistenceProvider &_spi;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
+ const bool _async_apply_bucket_diff;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 5315d3ec0bc..1ef883fc810 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,8 +19,9 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_processAllHandler(_env, provider),
_mergeHandler(_env, provider, component.cluster_context(), _clock,
cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
+ cfg.commonMergeChainOptimalizationMinimumSize,
+ cfg.asyncApplyBucketDiff),
+ _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
{
@@ -45,7 +46,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::CREATEBUCKET_ID:
return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
@@ -62,7 +63,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::APPLYBUCKETDIFF_ID:
return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
@@ -87,19 +88,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return MessageTracker::UP();
}
-void
-PersistenceHandler::handleReply(api::StorageReply& reply) const
+MessageTracker::UP
+PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const
{
switch (reply.getType().getId()) {
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
_mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
break;
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker));
break;
default:
break;
}
+ return tracker;
}
MessageTracker::UP
@@ -112,7 +114,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
try{
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- handleReply(static_cast<api::StorageReply&>(msg));
+ return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a800d1d4053..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -38,7 +38,7 @@ public:
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
- void handleReply(api::StorageReply&) const;
+ MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const;
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 471d3d62a35..ce424f0ce83 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste
return checkResult(_impl.setClusterState(bucketSpace, state));
}
-spi::Result
-ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState)
+void
+ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.setActiveState(bucket, newState));
+ onComplete->addResultHandler(this);
+ _impl.setActiveStateAsync(bucket, newState, std::move(onComplete));
}
spi::BucketInfoResult
@@ -72,32 +73,6 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const
return checkResult(_impl.getBucketInfo(bucket));
}
-spi::Result
-ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context)
-{
- return checkResult(_impl.put(bucket, ts, std::move(doc), context));
-}
-
-spi::RemoveResult
-ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context)
-{
- return checkResult(_impl.remove(bucket, ts, docId, context));
-}
-
-spi::RemoveResult
-ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts,
- const document::DocumentId& docId, spi::Context& context)
-{
- return checkResult(_impl.removeIfFound(bucket, ts, docId, context));
-}
-
-spi::UpdateResult
-ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts,
- spi::DocumentUpdateSP docUpdate, spi::Context& context)
-{
- return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context));
-}
-
spi::GetResult
ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
const document::DocumentId& docId, spi::Context& context) const
@@ -130,10 +105,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont
return checkResult(_impl.createBucket(bucket, context));
}
-spi::Result
-ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.deleteBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index d23cce9172a..c9d2411e372 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,12 +41,8 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
+
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
- spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult
createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions,
@@ -54,7 +50,6 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -67,6 +62,8 @@ public:
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4c09f09e63..b4fe207e2e5 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa
return document::FieldSet::SP();
}
-bool
-bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
- // Don't check document sizes, as background moving of documents in Proton
- // may trigger a change in size without any mutations taking place. This will
- // only take place when a document being moved was fed _prior_ to the change
- // where Proton starts reporting actual document sizes, and will eventually
- // converge to a stable value. But for now, ignore it to prevent false positive
- // error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
-}
}
SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
: _env(env),
@@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT
return tracker;
}
-bool
-SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
-{
- spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
- if (result.hasError()) {
- LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(), result.getErrorMessage().c_str());
- return false;
- }
- api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
- // Don't check meta fields or active/ready fields since these are not
- // that important and ready may change under the hood in a race with
- // getModifiedBuckets(). If bucket is empty it means it has already
- // been deleted by a racing split/join.
- if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
- LOG(error,
- "Service layer bucket database and provider out of sync before "
- "deleting bucket %s! Service layer db had %s while provider says "
- "bucket has %s. Deletion has been rejected to ensure data is not "
- "lost, but bucket may remain out of sync until service has been "
- "restarted.",
- bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
- return false;
- }
- return true;
-}
-
-MessageTracker::UP
-SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.deleteBuckets);
- LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
- }
- spi::Bucket bucket(cmd.getBucket());
- if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
- return tracker;
- }
- _spi.deleteBucket(bucket, tracker->context());
- StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
- {
- StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
- if (entry.exist() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- cmd.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
- }
- return tracker;
-}
-
MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 9f00f67684d..2cfbc7016c0 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -23,13 +23,11 @@ public:
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
};
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
index 0856f45c3ff..d5b44cc1911 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "splitbitdetector.h"
#include "messages.h"
-#include <vespa/storage/common/bucketmessages.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storageapi/message/bucket.h>
@@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker
}
MessageTracker::UP
-SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.setBucketStates);
- NotificationGuard notifyGuard(_bucketOwnershipNotifier);
-
- LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- spi::Bucket bucket(cmd.getBucket());
- bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
-
- spi::Result result(_spi.setActiveState(bucket, newState));
- if (tracker->checkForError(result)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
- if (entry.exist()) {
- entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
- entry.write();
- } else {
- LOG(warning, "Got OK setCurrentState result from provider for %s, "
- "but bucket has disappeared from service layer database",
- cmd.getBucketId().toString().c_str());
- }
-
- tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.recheckBucketInfo);
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h
index ddfa22b154c..4521e520ee9 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.h
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h
@@ -21,7 +21,6 @@ public:
SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &,
BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization);
MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const;
private:
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index fa9ab22c1cb..a17c77f6ca4 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -72,6 +72,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
: metrics::MetricSet("mergethrottler", {}, "", owner),
averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this),
queueSize("queuesize", {}, "Length of merge queue", this),
+ active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -366,6 +367,7 @@ MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter)
LOG(debug, "Removed merge for %s from internal state",
mergeIter->first.toString().c_str());
_merges.erase(mergeIter);
+ update_active_merge_window_size_metric();
}
api::StorageMessage::SP
@@ -815,6 +817,7 @@ MergeThrottler::processNewMergeCommand(
// merge throttling window.
assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
+ update_active_merge_window_size_metric();
LOG(debug, "Added merge %s to internal state",
mergeCmd.toString().c_str());
@@ -1247,6 +1250,11 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits)
}
void
+MergeThrottler::update_active_merge_window_size_metric() noexcept {
+ _metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 9b0fb125b2f..997477a4b70 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -64,6 +64,7 @@ public:
public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongValueMetric queueSize;
+ metrics::LongValueMetric active_window_size;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -388,6 +389,8 @@ private:
void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
+ void update_active_merge_window_size_metric() noexcept;
+
// const function, but metrics are mutable
void updateOperationMetrics(
const api::ReturnCode& result,