summaryrefslogtreecommitdiffstats
path: root/storage/src
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src')
-rw-r--r--storage/src/tests/distributor/idealstatemanagertest.cpp4
-rw-r--r--storage/src/tests/distributor/removebucketoperationtest.cpp12
-rw-r--r--storage/src/tests/distributor/statecheckerstest.cpp12
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp87
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h28
-rw-r--r--storage/src/tests/persistence/filestorage/operationabortingtest.cpp10
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp80
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/idealstatemetricsset.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp14
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h2
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp119
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.h9
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp1
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp63
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.h6
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.cpp18
-rw-r--r--storage/src/vespa/storage/persistence/persistencehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp40
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h9
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.cpp74
-rw-r--r--storage/src/vespa/storage/persistence/simplemessagehandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.cpp32
-rw-r--r--storage/src/vespa/storage/persistence/splitjoinhandler.h1
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h3
37 files changed, 370 insertions, 306 deletions
diff --git a/storage/src/tests/distributor/idealstatemanagertest.cpp b/storage/src/tests/distributor/idealstatemanagertest.cpp
index d521975e0cb..a1263c9433b 100644
--- a/storage/src/tests/distributor/idealstatemanagertest.cpp
+++ b/storage/src/tests/distributor/idealstatemanagertest.cpp
@@ -220,8 +220,8 @@ TEST_F(IdealStateManagerTest, block_check_for_all_operations_to_specific_bucket)
pending_message_tracker().insert(msg);
}
{
- RemoveBucketOperation op(dummy_cluster_context,
- BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(7)));
+ // TODO we might not want this particular behavior for merge operations either
+ MergeOperation op(BucketAndNodes(makeDocumentBucket(bid), toVector<uint16_t>(2, 3)));
// Not blocked for exact node match.
EXPECT_FALSE(checkBlock(op, makeDocumentBucket(bid), operation_context(), op_seq));
// But blocked for bucket match!
diff --git a/storage/src/tests/distributor/removebucketoperationtest.cpp b/storage/src/tests/distributor/removebucketoperationtest.cpp
index e877f4601b7..971ff36c833 100644
--- a/storage/src/tests/distributor/removebucketoperationtest.cpp
+++ b/storage/src/tests/distributor/removebucketoperationtest.cpp
@@ -119,4 +119,16 @@ TEST_F(RemoveBucketOperationTest, fail_with_invalid_bucket_info) {
EXPECT_EQ("NONEXISTING", dumpBucket(document::BucketId(16, 1)));
}
+TEST_F(RemoveBucketOperationTest, operation_blocked_when_pending_message_to_target_node) {
+ RemoveBucketOperation op(dummy_cluster_context,
+ BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(1, 3)));
+ // In node target set
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 1, 120));
+ EXPECT_TRUE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 3, 120));
+ // Not in node target set
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 0, 120));
+ EXPECT_FALSE(op.shouldBlockThisOperation(api::MessageType::PUT_ID, 2, 120));
+}
+
} // storage::distributor
diff --git a/storage/src/tests/distributor/statecheckerstest.cpp b/storage/src/tests/distributor/statecheckerstest.cpp
index 25009156f18..f5531a134d0 100644
--- a/storage/src/tests/distributor/statecheckerstest.cpp
+++ b/storage/src/tests/distributor/statecheckerstest.cpp
@@ -38,14 +38,15 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
struct PendingMessage
{
uint32_t _msgType;
+ uint16_t _node;
uint8_t _pri;
- PendingMessage() : _msgType(UINT32_MAX), _pri(0) {}
+ constexpr PendingMessage() noexcept : _msgType(UINT32_MAX), _node(0), _pri(0) {}
- PendingMessage(uint32_t msgType, uint8_t pri)
- : _msgType(msgType), _pri(pri) {}
+ constexpr PendingMessage(uint32_t msgType, uint8_t pri) noexcept
+ : _msgType(msgType), _node(0), _pri(pri) {}
- bool shouldCheck() const { return _msgType != UINT32_MAX; }
+ bool shouldCheck() const noexcept { return _msgType != UINT32_MAX; }
};
void enableClusterState(const lib::ClusterState& systemState) {
@@ -97,8 +98,7 @@ struct StateCheckersTest : Test, DistributorStripeTestUtil {
IdealStateOperation::UP op(result.createOperation());
if (op.get()) {
if (blocker.shouldCheck()
- && op->shouldBlockThisOperation(blocker._msgType,
- blocker._pri))
+ && op->shouldBlockThisOperation(blocker._msgType, blocker._node, blocker._pri))
{
return "BLOCKED";
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index a174d305c27..b3bd1c6a253 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -22,6 +22,15 @@
} \
}
+#define CHECK_ERROR_ASYNC(className, failType, onError) \
+ { \
+ Guard guard(_lock); \
+ if (_result.getErrorCode() != spi::Result::ErrorType::NONE && (_failureMask & (failType))) { \
+ onError->onComplete(std::make_unique<className>(_result.getErrorCode(), _result.getErrorMessage())); \
+ return; \
+ } \
+ }
+
namespace storage {
namespace {
@@ -87,46 +96,40 @@ PersistenceProviderWrapper::getBucketInfo(const spi::Bucket& bucket) const
return _spi.getBucketInfo(bucket);
}
-spi::Result
-PersistenceProviderWrapper::put(const spi::Bucket& bucket, spi::Timestamp timestamp,
- document::Document::SP doc, spi::Context& context)
+void
+PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::Document::SP doc,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("put(" << bucket << ", " << timestamp << ", " << doc->getId() << ")");
- CHECK_ERROR(spi::Result, FAIL_PUT);
- return _spi.put(bucket, timestamp, std::move(doc), context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_PUT, onComplete);
+ _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(onComplete));
}
-spi::RemoveResult
-PersistenceProviderWrapper::remove(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const spi::DocumentId& id,
- spi::Context& context)
+void
+PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")");
- CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE);
- return _spi.remove(bucket, timestamp, id, context);
+ CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete);
+ _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete));
}
-spi::RemoveResult
-PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- const spi::DocumentId& id,
- spi::Context& context)
+void
+PersistenceProviderWrapper::removeIfFoundAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("removeIfFound(" << bucket << ", " << timestamp << ", " << id << ")");
- CHECK_ERROR(spi::RemoveResult, FAIL_REMOVE_IF_FOUND);
- return _spi.removeIfFound(bucket, timestamp, id, context);
+ CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE_IF_FOUND, onComplete);
+ _spi.removeIfFoundAsync(bucket, timestamp, id, context, std::move(onComplete));
}
-spi::UpdateResult
-PersistenceProviderWrapper::update(const spi::Bucket& bucket,
- spi::Timestamp timestamp,
- document::DocumentUpdate::SP upd,
- spi::Context& context)
+void
+PersistenceProviderWrapper::updateAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, document::DocumentUpdate::SP upd,
+ spi::Context& context, spi::OperationComplete::UP onComplete)
{
LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")");
- CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE);
- return _spi.update(bucket, timestamp, std::move(upd), context);
+ CHECK_ERROR_ASYNC(spi::UpdateResult, FAIL_UPDATE, onComplete);
+ _spi.updateAsync(bucket, timestamp, std::move(upd), context, std::move(onComplete));
}
spi::GetResult
@@ -172,13 +175,13 @@ PersistenceProviderWrapper::destroyIterator(spi::IteratorId iterId,
return _spi.destroyIterator(iterId, context);
}
-spi::Result
-PersistenceProviderWrapper::deleteBucket(const spi::Bucket& bucket,
- spi::Context& context)
+void
+PersistenceProviderWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context,
+ spi::OperationComplete::UP operationComplete)
{
LOG_SPI("deleteBucket(" << bucket << ")");
- CHECK_ERROR(spi::Result, FAIL_DELETE_BUCKET);
- return _spi.deleteBucket(bucket, context);
+ CHECK_ERROR_ASYNC(spi::Result, FAIL_DELETE_BUCKET, operationComplete);
+ _spi.deleteBucketAsync(bucket, context, std::move(operationComplete));
}
spi::Result
@@ -225,4 +228,26 @@ PersistenceProviderWrapper::removeEntry(const spi::Bucket& bucket,
return _spi.removeEntry(bucket, timestamp, context);
}
+spi::Result
+PersistenceProviderWrapper::initialize() {
+ return _spi.initialize();
+}
+
+spi::BucketIdListResult
+PersistenceProviderWrapper::getModifiedBuckets(spi::PersistenceProvider::BucketSpace bucketSpace) const {
+ return _spi.getModifiedBuckets(bucketSpace);
+}
+
+spi::Result
+PersistenceProviderWrapper::setClusterState(spi::PersistenceProvider::BucketSpace bucketSpace, const spi::ClusterState &state) {
+ return _spi.setClusterState(bucketSpace, state);
+}
+
+void
+PersistenceProviderWrapper::setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
+ spi::OperationComplete::UP onComplete)
+{
+ _spi.setActiveStateAsync(bucket, state, std::move(onComplete));
+}
+
}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index d90fa7b2eaa..c6628814dba 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -21,7 +21,7 @@
namespace storage {
-class PersistenceProviderWrapper : public spi::AbstractPersistenceProvider
+class PersistenceProviderWrapper : public spi::PersistenceProvider
{
public:
enum OPERATION_FAILURE_FLAGS
@@ -47,11 +47,11 @@ public:
// TODO: add more as needed
};
private:
- spi::PersistenceProvider& _spi;
- spi::Result _result;
- mutable std::mutex _lock;
+ spi::PersistenceProvider& _spi;
+ spi::Result _result;
+ mutable std::mutex _lock;
mutable std::vector<std::string> _log;
- uint32_t _failureMask;
+ uint32_t _failureMask;
using Guard = std::lock_guard<std::mutex>;
public:
PersistenceProviderWrapper(spi::PersistenceProvider& spi);
@@ -88,13 +88,21 @@ public:
_log.clear();
}
+ spi::Result initialize() override;
+ spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
+
+ spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState &state) override;
+
+ void setActiveStateAsync(const spi::Bucket &bucket, spi::BucketInfo::ActiveState state,
+ spi::OperationComplete::UP up) override;
+
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
- spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
+ void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult
@@ -103,7 +111,7 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1,
const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2,
diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
index c51357cacd1..07d2b24d536 100644
--- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
+++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp
@@ -46,14 +46,15 @@ public:
_deleteBucketInvocations(0)
{}
- spi::Result put(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&) override
+ void
+ putAsync(const spi::Bucket&, spi::Timestamp, document::Document::SP, spi::Context&, spi::OperationComplete::UP onComplete) override
{
_queueBarrier.await();
// message abort stage with active opertion in disk queue
std::this_thread::sleep_for(75ms);
_completionBarrier.await();
// test finished
- return spi::Result();
+ onComplete->onComplete(std::make_unique<spi::Result>());
}
spi::BucketInfoResult getBucketInfo(const spi::Bucket& bucket) const override {
@@ -66,9 +67,10 @@ public:
return PersistenceProviderWrapper::createBucket(bucket, ctx);
}
- spi::Result deleteBucket(const spi::Bucket& bucket, spi::Context& ctx) override {
+ void
+ deleteBucketAsync(const spi::Bucket& bucket, spi::Context& ctx, spi::OperationComplete::UP onComplete) override {
++_deleteBucketInvocations;
- return PersistenceProviderWrapper::deleteBucket(bucket, ctx);
+ PersistenceProviderWrapper::deleteBucketAsync(bucket, ctx, std::move(onComplete));
}
};
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index fa989410137..60030004594 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -304,10 +304,10 @@ MergeHandlerTest::testApplyBucketDiffChain(bool midChain)
EXPECT_FALSE(replySent.get());
LOG(debug, "Verifying that replying the diff sends on back");
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd2);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd2);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply, stub);
+ handler.handleApplyBucketDiffReply(*reply, stub, createTracker(reply, _bucket));
ASSERT_EQ(1, stub.replies.size());
replySent = stub.replies[0];
}
@@ -353,12 +353,12 @@ TEST_F(MergeHandlerTest, master_message_flow) {
ASSERT_EQ(2, messageKeeper()._msgs.size());
ASSERT_EQ(api::MessageType::APPLYBUCKETDIFF, messageKeeper()._msgs[1]->getType());
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
- auto reply2 = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply2 = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
ASSERT_EQ(1, reply2->getDiff().size());
reply2->getDiff()[0]._entry._hasMask |= 2u;
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*reply2, stub);
+ handler.handleApplyBucketDiffReply(*reply2, stub, createTracker(reply2, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -470,9 +470,9 @@ TEST_F(MergeHandlerTest, chunked_apply_bucket_diff) {
}
}
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
{
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, messageKeeper(), createTracker(applyBucketDiffReply, _bucket));
if (!messageKeeper()._msgs.empty()) {
ASSERT_FALSE(reply.get());
@@ -672,10 +672,10 @@ TEST_F(MergeHandlerTest, merge_progress_safe_guard) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
MessageSenderStub stub;
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.replies.size());
@@ -699,14 +699,14 @@ TEST_F(MergeHandlerTest, safe_guard_not_invoked_when_has_mask_changes) {
handler.handleGetBucketDiffReply(*getBucketDiffReply, messageKeeper());
auto applyBucketDiffCmd = fetchSingleMessage<api::ApplyBucketDiffCommand>();
- auto applyBucketDiffReply = std::make_unique<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
+ auto applyBucketDiffReply = std::make_shared<api::ApplyBucketDiffReply>(*applyBucketDiffCmd);
ASSERT_FALSE(applyBucketDiffReply->getDiff().empty());
// Change a hasMask to indicate something changed during merging.
applyBucketDiffReply->getDiff()[0]._entry._hasMask = 0x5;
MessageSenderStub stub;
LOG(debug, "sending apply bucket diff reply");
- handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub);
+ handler.handleApplyBucketDiffReply(*applyBucketDiffReply, stub, createTracker(applyBucketDiffReply, _bucket));
ASSERT_EQ(1, stub.commands.size());
@@ -1012,10 +1012,10 @@ MergeHandlerTest::HandleApplyBucketDiffReplyInvoker::invoke(
spi::Context&)
{
(void) test;
- api::ApplyBucketDiffReply reply(*_applyCmd);
- test.fillDummyApplyDiff(reply.getDiff());
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(*_applyCmd);
+ test.fillDummyApplyDiff(reply->getDiff());
_stub.clear();
- handler.handleApplyBucketDiffReply(reply, _stub);
+ handler.handleApplyBucketDiffReply(*reply, _stub, test.createTracker(reply, test._bucket));
}
std::string
@@ -1275,8 +1275,8 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
EXPECT_EQ(1, cmd2.getAddress()->getIndex());
EXPECT_EQ(1234, cmd2.getSourceIndex());
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
baseline_diff_size = cmd2.getDiff().size();
auto reply = std::make_unique<api::GetBucketDiffReply>(cmd2);
auto &diff = reply->getDiff();
@@ -1292,16 +1292,16 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking first ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Node 4 has been eliminated before the first ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 2u, s.diff.size());
- EXPECT_EQ(EntryCheck(20000, 24u), s.diff[baseline_diff_size]);
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size + 1]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 2u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20000, 24u), s->diff[baseline_diff_size]);
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size + 1]);
auto& cmd3 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[1]);
// ApplyBucketDiffCommand has a shorter node list, node 2 is not present
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd3.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd3);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd3);
auto& diff = reply->getDiff();
EXPECT_EQ(2u, diff.size());
EXPECT_EQ(EntryCheck(20000u, 4u), diff[0]._entry);
@@ -1312,7 +1312,7 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
*/
fill_entry(diff[0], *doc1, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u; // Simulate diff entry having been applied on node 1.
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled first ApplyBucketDiffReply");
}
ASSERT_EQ(3u, messageKeeper()._msgs.size());
@@ -1320,19 +1320,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking second ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 24u), s.diff[baseline_diff_size]);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 24u), s->diff[baseline_diff_size]);
auto& cmd4 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[2]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {3, true}}), cmd4.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd4);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd4);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
EXPECT_EQ(EntryCheck(20100u, 4u), diff[0]._entry);
// Simulate that node 3 somehow lost doc2 when trying to fill diff entry.
diff[0]._entry._hasMask &= ~4u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled second ApplyBucketDiffReply");
}
ASSERT_EQ(4u, messageKeeper()._msgs.size());
@@ -1340,21 +1340,21 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking third ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// Nodes 3 and 2 have been eliminated before the third ApplyBucketDiff command
- EXPECT_EQ((NodeList{{0, false}, {1, false}}), s.nodeList);
- EXPECT_EQ(baseline_diff_size + 1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[baseline_diff_size]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}}), s->nodeList);
+ EXPECT_EQ(baseline_diff_size + 1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[baseline_diff_size]);
auto& cmd5 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[3]);
EXPECT_EQ((NodeList{{0, false}, {1, false}}), cmd5.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd5);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd5);
auto& diff = reply->getDiff();
EXPECT_EQ(baseline_diff_size, diff.size());
for (auto& e : diff) {
EXPECT_EQ(1u, e._entry._hasMask);
e._entry._hasMask |= 2u;
}
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled third ApplyBucketDiffReply");
}
ASSERT_EQ(5u, messageKeeper()._msgs.size());
@@ -1362,19 +1362,19 @@ TEST_F(MergeHandlerTest, partially_filled_apply_bucket_diff_reply)
{
LOG(debug, "checking fourth ApplyBucketDiff command");
EXPECT_TRUE(getEnv()._fileStorHandler.isMerging(_bucket));
- auto &s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
+ auto s = getEnv()._fileStorHandler.editMergeStatus(_bucket);
// All nodes in use again due to failure to fill diff entry for doc2
- EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s.nodeList);
- EXPECT_EQ(1u, s.diff.size());
- EXPECT_EQ(EntryCheck(20100, 16u), s.diff[0]);
+ EXPECT_EQ((NodeList{{0, false}, {1, false}, {2, true}, {3, true}, {4, true}}), s->nodeList);
+ EXPECT_EQ(1u, s->diff.size());
+ EXPECT_EQ(EntryCheck(20100, 16u), s->diff[0]);
auto& cmd6 = dynamic_cast<api::ApplyBucketDiffCommand&>(*messageKeeper()._msgs[4]);
EXPECT_EQ((NodeList{{0, false}, {1, false}, {4, true}}), cmd6.getNodes());
- auto reply = std::make_unique<api::ApplyBucketDiffReply>(cmd6);
+ auto reply = std::make_shared<api::ApplyBucketDiffReply>(cmd6);
auto& diff = reply->getDiff();
EXPECT_EQ(1u, diff.size());
fill_entry(diff[0], *doc2, getEnv().getDocumentTypeRepo());
diff[0]._entry._hasMask |= 2u;
- handler.handleApplyBucketDiffReply(*reply, messageKeeper());
+ handler.handleApplyBucketDiffReply(*reply, messageKeeper(), createTracker(reply, _bucket));
LOG(debug, "handled fourth ApplyBucketDiffReply");
}
ASSERT_EQ(6u, messageKeeper()._msgs.size());
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 568eee1e92c..5c192942521 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -1151,6 +1151,10 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
size_t maxQueue = _throttlers[0]->getMaxQueueSize();
ASSERT_EQ(20, maxQueue);
ASSERT_LT(maxPending, 100);
+
+ EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), 0);
+ EXPECT_EQ(_throttlers[0]->getMetrics().queueSize.getLast(), 0);
+
for (size_t i = 0; i < maxPending + maxQueue; ++i) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
// No chain set, i.e. merge command is freshly squeezed from a distributor.
@@ -1162,6 +1166,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist
// Wait till we have maxPending replies and maxQueue queued
_topLinks[0]->waitForMessages(maxPending, _messageWaitTime);
waitUntilMergeQueueIs(*_throttlers[0], maxQueue, _messageWaitTime);
+ EXPECT_EQ(_throttlers[0]->getMetrics().active_window_size.getLast(), maxPending);
EXPECT_EQ(maxQueue, _throttlers[0]->getMetrics().queueSize.getMaximum());
// Clear all forwarded merges
diff --git a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
index d3096e6864e..618e49c4238 100644
--- a/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/idealstatemetricsset.cpp
@@ -39,13 +39,13 @@ MergeBucketMetricSet::MergeBucketMetricSet(const std::string& name, metrics::Met
: OperationMetricSet(name, std::move(tags), description, owner),
source_only_copy_changed("source_only_copy_changed",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where source-only copy changed"),
+ "The number of merge operations where source-only copy changed", this),
source_only_copy_delete_blocked("source_only_copy_delete_blocked",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where delete of unchanged source-only copies was blocked"),
+ "The number of merge operations where delete of unchanged source-only copies was blocked", this),
source_only_copy_delete_failed("source_only_copy_delete_failed",
{{"logdefault"},{"yamasdefault"}},
- "The number of merge operations where delete of unchanged source-only copies failed")
+ "The number of merge operations where delete of unchanged source-only copies failed", this)
{
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index fbe1c142b09..7f66d1effd5 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -86,7 +86,7 @@ void GarbageCollectionOperation::update_gc_metrics() {
}
bool
-GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const {
+GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const {
return true;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 2e010a61bde..f51739242b7 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -21,7 +21,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "garbagecollection"; };
Type getType() const override { return GARBAGE_COLLECTION; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
index b1231fafcd9..744b24b593e 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.cpp
@@ -138,8 +138,7 @@ public:
bool check(uint32_t messageType, uint16_t node, uint8_t priority) override
{
- (void) node;
- if (op.shouldBlockThisOperation(messageType, priority)) {
+ if (op.shouldBlockThisOperation(messageType, node, priority)) {
blocked = true;
return false;
}
@@ -232,6 +231,7 @@ IdealStateOperation::toString() const
bool
IdealStateOperation::shouldBlockThisOperation(uint32_t messageType,
+ [[maybe_unused]] uint16_t node,
uint8_t) const
{
for (uint32_t i = 0; MAINTENANCE_MESSAGE_TYPES[i] != 0; ++i) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
index d4dc4e405df..f8f35afe821 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/idealstateoperation.h
@@ -217,7 +217,7 @@ public:
/**
* Should return true if the given message type should block this operation.
*/
- virtual bool shouldBlockThisOperation(uint32_t messageType, uint8_t priority) const;
+ virtual bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t priority) const;
protected:
friend struct IdealStateManagerTest;
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
index 7cfe4172b2c..f951a880e5d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp
@@ -329,14 +329,14 @@ constexpr std::array<uint32_t, 7> WRITE_FEED_MESSAGE_TYPES {{
}
-bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const {
+bool MergeOperation::shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const {
for (auto blocking_type : WRITE_FEED_MESSAGE_TYPES) {
if (messageType == blocking_type) {
return true;
}
}
- return IdealStateOperation::shouldBlockThisOperation(messageType, pri);
+ return IdealStateOperation::shouldBlockThisOperation(messageType, node, pri);
}
bool MergeOperation::isBlocked(const DistributorStripeOperationContext& ctx,
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
index 1bca1f7389f..832c0f99681 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h
@@ -48,7 +48,7 @@ public:
const document::BucketId&, MergeLimiter&,
std::vector<MergeMetaData>&);
- bool shouldBlockThisOperation(uint32_t messageType, uint8_t pri) const override;
+ bool shouldBlockThisOperation(uint32_t messageType, uint16_t node, uint8_t pri) const override;
bool isBlocked(const DistributorStripeOperationContext& ctx, const OperationSequencer&) const override;
private:
static void addIdealNodes(
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index 9a57722dc7e..25cae5b9979 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -61,8 +61,7 @@ RemoveBucketOperation::onStart(DistributorStripeMessageSender& sender)
bool
RemoveBucketOperation::onReceiveInternal(const std::shared_ptr<api::StorageReply> &msg)
{
- api::DeleteBucketReply* rep =
- dynamic_cast<api::DeleteBucketReply*>(msg.get());
+ auto* rep = dynamic_cast<api::DeleteBucketReply*>(msg.get());
uint16_t node = _tracker.handleReply(*rep);
@@ -112,8 +111,15 @@ RemoveBucketOperation::onReceive(DistributorStripeMessageSender&, const std::sha
}
bool
-RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint8_t) const
+RemoveBucketOperation::shouldBlockThisOperation(uint32_t, uint16_t target_node, uint8_t) const
{
- return true;
+ // Number of nodes is expected to be 1 in the vastly common case (and a highly bounded
+ // number in the worst case), so a simple linear scan suffices.
+ for (uint16_t node : getNodes()) {
+ if (target_node == node) {
+ return true;
+ }
+ }
+ return false;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
index a0d496f948a..5e0922d5685 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.h
@@ -30,7 +30,7 @@ public:
void onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
const char* getName() const override { return "remove"; };
Type getType() const override { return DELETE_BUCKET; }
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
};
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 649503cf0f5..6f3924535ef 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -150,6 +150,7 @@ SplitOperation::isBlocked(const DistributorStripeOperationContext& ctx, const Op
bool
SplitOperation::shouldBlockThisOperation(uint32_t msgType,
+ [[maybe_unused]] uint16_t node,
uint8_t pri) const
{
if (msgType == api::MessageType::SPLITBUCKET_ID && _priority >= pri) {
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
index ee957309088..6a268155fc8 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.h
@@ -21,7 +21,7 @@ public:
const char* getName() const override { return "split"; };
Type getType() const override { return SPLIT_BUCKET; }
bool isBlocked(const DistributorStripeOperationContext&, const OperationSequencer&) const override;
- bool shouldBlockThisOperation(uint32_t, uint8_t) const override;
+ bool shouldBlockThisOperation(uint32_t, uint16_t, uint8_t) const override;
protected:
MessageTracker _tracker;
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index 4a28e650fac..d150f5600e5 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -3,11 +3,16 @@
#include "asynchandler.h"
#include "persistenceutil.h"
#include "testandsethelper.h"
+#include "bucketownershipnotifier.h"
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/vespalib/util/isequencedtaskexecutor.h>
#include <vespa/vespalib/util/destructor_callbacks.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".storage.persistence.asynchandler");
+
namespace storage {
namespace {
@@ -86,12 +91,26 @@ private:
vespalib::ISequencedTaskExecutor::ExecutorId _executorId;
};
+bool
+bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
+ // Don't check document sizes, as background moving of documents in Proton
+ // may trigger a change in size without any mutations taking place. This will
+ // only take place when a document being moved was fed _prior_ to the change
+ // where Proton starts reporting actual document sizes, and will eventually
+ // converge to a stable value. But for now, ignore it to prevent false positive
+ // error logs and non-deleted buckets.
+ return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
}
+
+}
+
AsyncHandler::AsyncHandler(const PersistenceUtil & env, spi::PersistenceProvider & spi,
+ BucketOwnershipNotifier &bucketOwnershipNotifier,
vespalib::ISequencedTaskExecutor & executor,
const document::BucketIdFactory & bucketIdFactory)
: _env(env),
_spi(spi),
+ _bucketOwnershipNotifier(bucketOwnershipNotifier),
_sequencedExecutor(executor),
_bucketIdFactory(bucketIdFactory)
{}
@@ -135,6 +154,79 @@ AsyncHandler::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) cons
}
MessageTracker::UP
+AsyncHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
+{
+ tracker->setMetric(_env._metrics.deleteBuckets);
+ LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
+ if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
+ _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
+ }
+ spi::Bucket bucket(cmd.getBucket());
+ if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
+ return tracker;
+ }
+
+ auto task = makeResultTask([this, tracker = std::move(tracker), bucket=cmd.getBucket()](spi::Result::UP ignored) {
+ // TODO Even if an non OK response can not be handled sanely we might probably log a message, or increment a metric
+ (void) ignored;
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(), "onDeleteBucket");
+ if (entry.exist() && entry->getMetaCount() > 0) {
+ LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
+ "active operation when delete bucket was queued. "
+ "Updating bucket database to keep it in sync with file. "
+ "Cannot delete bucket from bucket database at this "
+ "point, as it can have been intentionally recreated "
+ "after delete bucket had been sent",
+ bucket.getBucketId().toString().c_str());
+ api::BucketInfo info(0, 0, 0);
+ // Only set document counts/size; retain ready/active state.
+ info.setReady(entry->getBucketInfo().isReady());
+ info.setActive(entry->getBucketInfo().isActive());
+
+ entry->setBucketInfo(info);
+ entry.write();
+ }
+ tracker->sendReply();
+ });
+ _spi.deleteBucketAsync(bucket, tracker->context(), std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return tracker;
+}
+
+MessageTracker::UP
+AsyncHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP trackerUP) const
+{
+ trackerUP->setMetric(_env._metrics.setBucketStates);
+
+ //LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
+ spi::Bucket bucket(cmd.getBucket());
+ bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
+ spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
+
+ auto task = makeResultTask([this, &cmd, newState, tracker = std::move(trackerUP), bucket,
+ notifyGuard = std::make_unique<NotificationGuard>(_bucketOwnershipNotifier)](spi::Result::UP response) mutable {
+ if (tracker->checkForError(*response)) {
+ StorBucketDatabase &db(_env.getBucketDatabase(bucket.getBucketSpace()));
+ StorBucketDatabase::WrappedEntry entry = db.get(bucket.getBucketId(),"handleSetBucketState");
+ if (entry.exist()) {
+ entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
+ notifyGuard->notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
+ entry.write();
+ } else {
+ LOG(warning, "Got OK setCurrentState result from provider for %s, "
+ "but bucket has disappeared from service layer database",
+ cmd.getBucketId().toString().c_str());
+ }
+ tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
+ }
+ tracker->sendReply();
+ });
+ _spi.setActiveStateAsync(bucket, newState, std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
+ return trackerUP;
+}
+
+MessageTracker::UP
AsyncHandler::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) const
{
MessageTracker & tracker = *trackerUP;
@@ -233,4 +325,31 @@ AsyncHandler::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTra
return true;
}
+bool
+AsyncHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
+{
+ spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
+ if (result.hasError()) {
+ LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
+ bucket.toString().c_str(), result.getErrorMessage().c_str());
+ return false;
+ }
+ api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
+ // Don't check meta fields or active/ready fields since these are not
+ // that important and ready may change under the hood in a race with
+ // getModifiedBuckets(). If bucket is empty it means it has already
+ // been deleted by a racing split/join.
+ if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
+ LOG(error,
+ "Service layer bucket database and provider out of sync before "
+ "deleting bucket %s! Service layer db had %s while provider says "
+ "bucket has %s. Deletion has been rejected to ensure data is not "
+ "lost, but bucket may remain out of sync until service has been "
+ "restarted.",
+ bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
+ return false;
+ }
+ return true;
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.h b/storage/src/vespa/storage/persistence/asynchandler.h
index 23f3605dca1..4f5c242570c 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.h
+++ b/storage/src/vespa/storage/persistence/asynchandler.h
@@ -14,6 +14,7 @@ namespace spi {
class Context;
}
class PersistenceUtil;
+class BucketOwnershipNotifier;
/**
* Handle async operations that uses a sequenced executor.
@@ -21,19 +22,23 @@ class PersistenceUtil;
*/
class AsyncHandler : public Types {
public:
- AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, vespalib::ISequencedTaskExecutor & executor,
- const document::BucketIdFactory & bucketIdFactory);
+ AsyncHandler(const PersistenceUtil&, spi::PersistenceProvider&, BucketOwnershipNotifier &,
+ vespalib::ISequencedTaskExecutor & executor, const document::BucketIdFactory & bucketIdFactory);
MessageTrackerUP handlePut(api::PutCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRemove(api::RemoveCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleUpdate(api::UpdateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRunTask(RunTaskCommand & cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
+ MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
static bool is_async_message(api::MessageType::Id type_id) noexcept;
private:
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
static bool tasConditionExists(const api::TestAndSetCommand & cmd);
bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker,
spi::Context & context, bool missingDocumentImpliesMatch = false) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
+ BucketOwnershipNotifier & _bucketOwnershipNotifier;
vespalib::ISequencedTaskExecutor & _sequencedExecutor;
const document::BucketIdFactory & _bucketIdFactory;
};
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 0dcb8539bff..70ed9845cb0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -200,13 +200,13 @@ public:
virtual void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) = 0;
/**
- * Returns the reference to the current merge status for the given bucket.
+ * Returns a shared pointer to the current merge status for the given bucket.
* This allows unlocked access to an internal variable, so users should
* first check that noone else is using it by calling isMerging() first.
*
* @param bucket The bucket to start merging.
*/
- virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0;
+ virtual std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket& bucket) = 0;
/**
* Returns true if the bucket is currently being merged on this node.
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 62f9fa12a21..e395a7df9e0 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -86,7 +86,7 @@ FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, std::shared_
_mergeStates[bucket] = status;
}
-MergeStatus&
+std::shared_ptr<MergeStatus>
FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
{
std::lock_guard mlock(_mergeStatesLock);
@@ -94,7 +94,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket)
if ( ! status ) {
throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC);
}
- return *status;
+ return status;
}
bool
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index a3dc316cdde..5f212b18a7f 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -218,7 +218,7 @@ public:
}
void addMergeStatus(const document::Bucket&, std::shared_ptr<MergeStatus>) override;
- MergeStatus& editMergeStatus(const document::Bucket&) override;
+ std::shared_ptr<MergeStatus> editMergeStatus(const document::Bucket&) override;
bool isMerging(const document::Bucket&) const override;
void clearMergeStatus(const document::Bucket& bucket) override;
void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override;
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index ddcab0f3659..2ffb827accf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -10,7 +10,6 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/storage/common/messagebucket.h>
-#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/persistence/bucketownershipnotifier.h>
#include <vespa/storage/persistence/persistencethread.h>
#include <vespa/storage/persistence/persistencehandler.h>
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7693156ae30..963fddd9fb5 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -23,13 +23,15 @@ namespace storage {
MergeHandler::MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
uint32_t maxChunkSize,
- uint32_t commonMergeChainOptimalizationMinimumSize)
+ uint32_t commonMergeChainOptimalizationMinimumSize,
+ bool async_apply_bucket_diff)
: _clock(clock),
_cluster_context(cluster_context),
_env(env),
_spi(spi),
_maxChunkSize(maxChunkSize),
- _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize)
+ _commonMergeChainOptimalizationMinimumSize(commonMergeChainOptimalizationMinimumSize),
+ _async_apply_bucket_diff(async_apply_bucket_diff)
{
}
@@ -1136,49 +1138,49 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
api::StorageReply::SP replyToSend;
bool clearState = true;
try {
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
if (reply.getResult().failed()) {
// We failed, so we should reply to the pending message.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
// If we didn't fail, reply should have good content
// Sanity check for nodes
assert(reply.getNodes().size() >= 2);
// Get bucket diff should retrieve all info at once
- assert(s.diff.size() == 0);
- s.diff.insert(s.diff.end(),
+ assert(s->diff.size() == 0);
+ s->diff.insert(s->diff.end(),
reply.getDiff().begin(),
reply.getDiff().end());
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on, and shouldn't reply now.
clearState = false;
} else {
_env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(
- s.startTime.getElapsedTimeAsDouble());
+ s->startTime.getElapsedTimeAsDouble());
}
}
} else {
// Exists in send on list, send on!
- replyToSend = s.pendingGetDiff;
+ replyToSend = s->pendingGetDiff;
LOG(spam, "Received GetBucketDiffReply for %s with diff of "
"size %zu. Sending it on.",
bucket.toString().c_str(), reply.getDiff().size());
- s.pendingGetDiff->getDiff().swap(reply.getDiff());
+ s->pendingGetDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
@@ -1282,8 +1284,9 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
}
void
-MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender) const
+MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,MessageSender& sender, MessageTracker::UP tracker) const
{
+ (void) tracker;
_env._metrics.applyBucketDiffReply.inc();
spi::Bucket bucket(reply.getBucket());
ApplyBucketDiffState async_results(*this, bucket);
@@ -1296,11 +1299,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
return;
}
- MergeStatus& s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
- if (s.pendingId != reply.getMsgId()) {
+ auto s = _env._fileStorHandler.editMergeStatus(bucket.getBucket());
+ if (s->pendingId != reply.getMsgId()) {
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
- bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
+ bucket.toString().c_str(), reply.getMsgId(), s->pendingId);
return;
}
bool clearState = true;
@@ -1315,12 +1318,12 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
uint8_t index = findOwnIndex(reply.getNodes(), _env._nodeIndex);
if (applyDiffNeedLocalData(diff, index, false)) {
framework::MilliSecTimer startTime(_clock);
- fetchLocalData(bucket, diff, index, s.context);
+ fetchLocalData(bucket, diff, index, s->context);
_env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
}
if (applyDiffHasLocallyNeededData(diff, index)) {
framework::MilliSecTimer startTime(_clock);
- applyDiffLocally(bucket, diff, index, s.context, async_results);
+ applyDiffLocally(bucket, diff, index, s->context, async_results);
_env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
async_results.check();
async_results.sync_bucket_info();
@@ -1332,50 +1335,50 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,Messag
}
}
- if (s.isFirstNode()) {
+ if (s->isFirstNode()) {
uint16_t hasMask = 0;
for (uint16_t i=0; i<reply.getNodes().size(); ++i) {
hasMask |= (1 << i);
}
- const size_t diffSizeBefore = s.diff.size();
- const bool altered = s.removeFromDiff(diff, hasMask, reply.getNodes());
+ const size_t diffSizeBefore = s->diff.size();
+ const bool altered = s->removeFromDiff(diff, hasMask, reply.getNodes());
if (reply.getResult().success()
- && s.diff.size() == diffSizeBefore
+ && s->diff.size() == diffSizeBefore
&& !altered)
{
std::string msg(
vespalib::make_string(
"Completed merge cycle without fixing "
"any entries (merge state diff at %zu entries)",
- s.diff.size()));
+ s->diff.size()));
returnCode = api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, msg);
LOG(warning,
"Got reply indicating merge cycle did not fix any entries: %s",
reply.toString(true).c_str());
LOG(warning,
"Merge state for which there was no progress across a full merge cycle: %s",
- s.toString().c_str());
+ s->toString().c_str());
}
if (returnCode.failed()) {
// Should reply now, since we failed.
- replyToSend = s.reply;
+ replyToSend = s->reply;
} else {
- replyToSend = processBucketMerge(bucket, s, sender, s.context);
+ replyToSend = processBucketMerge(bucket, *s, sender, s->context);
if (!replyToSend.get()) {
// We have sent something on and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s.startTime.getElapsedTimeAsDouble());
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble());
}
}
} else {
- replyToSend = s.pendingApplyDiff;
+ replyToSend = s->pendingApplyDiff;
LOG(debug, "ApplyBucketDiff(%s) finished. Sending reply.",
bucket.toString().c_str());
- s.pendingApplyDiff->getDiff().swap(reply.getDiff());
+ s->pendingApplyDiff->getDiff().swap(reply.getDiff());
}
} catch (std::exception& e) {
_env._fileStorHandler.clearMergeStatus(
diff --git a/storage/src/vespa/storage/persistence/mergehandler.h b/storage/src/vespa/storage/persistence/mergehandler.h
index fa7e21dae78..0ff8f3c0ef8 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.h
+++ b/storage/src/vespa/storage/persistence/mergehandler.h
@@ -45,7 +45,8 @@ public:
MergeHandler(PersistenceUtil& env, spi::PersistenceProvider& spi,
const ClusterContext& cluster_context, const framework::Clock & clock,
uint32_t maxChunkSize = 4190208,
- uint32_t commonMergeChainOptimalizationMinimumSize = 64);
+ uint32_t commonMergeChainOptimalizationMinimumSize = 64,
+ bool async_apply_bucket_diff = false);
bool buildBucketInfoList(
const spi::Bucket& bucket,
@@ -68,7 +69,7 @@ public:
MessageTrackerUP handleGetBucketDiff(api::GetBucketDiffCommand&, MessageTrackerUP) const;
void handleGetBucketDiffReply(api::GetBucketDiffReply&, MessageSender&) const;
MessageTrackerUP handleApplyBucketDiff(api::ApplyBucketDiffCommand&, MessageTrackerUP) const;
- void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&) const;
+ void handleApplyBucketDiffReply(api::ApplyBucketDiffReply&, MessageSender&, MessageTrackerUP) const;
private:
const framework::Clock &_clock;
@@ -77,6 +78,7 @@ private:
spi::PersistenceProvider &_spi;
const uint32_t _maxChunkSize;
const uint32_t _commonMergeChainOptimalizationMinimumSize;
+ const bool _async_apply_bucket_diff;
/** Returns a reply if merge is complete */
api::StorageReply::SP processBucketMerge(const spi::Bucket& bucket,
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.cpp b/storage/src/vespa/storage/persistence/persistencehandler.cpp
index 5315d3ec0bc..1ef883fc810 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.cpp
+++ b/storage/src/vespa/storage/persistence/persistencehandler.cpp
@@ -19,8 +19,9 @@ PersistenceHandler::PersistenceHandler(vespalib::ISequencedTaskExecutor & sequen
_processAllHandler(_env, provider),
_mergeHandler(_env, provider, component.cluster_context(), _clock,
cfg.bucketMergeChunkSize,
- cfg.commonMergeChainOptimalizationMinimumSize),
- _asyncHandler(_env, provider, sequencedExecutor, component.getBucketIdFactory()),
+ cfg.commonMergeChainOptimalizationMinimumSize,
+ cfg.asyncApplyBucketDiff),
+ _asyncHandler(_env, provider, bucketOwnershipNotifier, sequencedExecutor, component.getBucketIdFactory()),
_splitJoinHandler(_env, provider, bucketOwnershipNotifier, cfg.enableMultibitSplitOptimalization),
_simpleHandler(_env, provider)
{
@@ -45,7 +46,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::CREATEBUCKET_ID:
return _simpleHandler.handleCreateBucket(static_cast<api::CreateBucketCommand&>(msg), std::move(tracker));
case api::MessageType::DELETEBUCKET_ID:
- return _simpleHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleDeleteBucket(static_cast<api::DeleteBucketCommand&>(msg), std::move(tracker));
case api::MessageType::JOINBUCKETS_ID:
return _splitJoinHandler.handleJoinBuckets(static_cast<api::JoinBucketsCommand&>(msg), std::move(tracker));
case api::MessageType::SPLITBUCKET_ID:
@@ -62,7 +63,7 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
case api::MessageType::APPLYBUCKETDIFF_ID:
return _mergeHandler.handleApplyBucketDiff(static_cast<api::ApplyBucketDiffCommand&>(msg), std::move(tracker));
case api::MessageType::SETBUCKETSTATE_ID:
- return _splitJoinHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
+ return _asyncHandler.handleSetBucketState(static_cast<api::SetBucketStateCommand&>(msg), std::move(tracker));
case api::MessageType::INTERNAL_ID:
switch(static_cast<api::InternalCommand&>(msg).getType()) {
case GetIterCommand::ID:
@@ -87,19 +88,20 @@ PersistenceHandler::handleCommandSplitByType(api::StorageCommand& msg, MessageTr
return MessageTracker::UP();
}
-void
-PersistenceHandler::handleReply(api::StorageReply& reply) const
+MessageTracker::UP
+PersistenceHandler::handleReply(api::StorageReply& reply, MessageTracker::UP tracker) const
{
switch (reply.getType().getId()) {
case api::MessageType::GETBUCKETDIFF_REPLY_ID:
_mergeHandler.handleGetBucketDiffReply(static_cast<api::GetBucketDiffReply&>(reply), _env._fileStorHandler);
break;
case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
- _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler);
+ _mergeHandler.handleApplyBucketDiffReply(static_cast<api::ApplyBucketDiffReply&>(reply), _env._fileStorHandler, std::move(tracker));
break;
default:
break;
}
+ return tracker;
}
MessageTracker::UP
@@ -112,7 +114,7 @@ PersistenceHandler::processMessage(api::StorageMessage& msg, MessageTracker::UP
try{
LOG(debug, "Handling reply: %s", msg.toString().c_str());
LOG(spam, "Message content: %s", msg.toString(true).c_str());
- handleReply(static_cast<api::StorageReply&>(msg));
+ return handleReply(static_cast<api::StorageReply&>(msg), std::move(tracker));
} catch (std::exception& e) {
// It's a reply, so nothing we can do.
LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what());
diff --git a/storage/src/vespa/storage/persistence/persistencehandler.h b/storage/src/vespa/storage/persistence/persistencehandler.h
index a800d1d4053..a92c2dc78ca 100644
--- a/storage/src/vespa/storage/persistence/persistencehandler.h
+++ b/storage/src/vespa/storage/persistence/persistencehandler.h
@@ -38,7 +38,7 @@ public:
private:
// Message handling functions
MessageTracker::UP handleCommandSplitByType(api::StorageCommand&, MessageTracker::UP tracker) const;
- void handleReply(api::StorageReply&) const;
+ MessageTracker::UP handleReply(api::StorageReply&, MessageTracker::UP) const;
MessageTracker::UP processMessage(api::StorageMessage& msg, MessageTracker::UP tracker) const;
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 471d3d62a35..ce424f0ce83 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -60,10 +60,11 @@ ProviderErrorWrapper::setClusterState(BucketSpace bucketSpace, const spi::Cluste
return checkResult(_impl.setClusterState(bucketSpace, state));
}
-spi::Result
-ProviderErrorWrapper::setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState)
+void
+ProviderErrorWrapper::setActiveStateAsync(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.setActiveState(bucket, newState));
+ onComplete->addResultHandler(this);
+ _impl.setActiveStateAsync(bucket, newState, std::move(onComplete));
}
spi::BucketInfoResult
@@ -72,32 +73,6 @@ ProviderErrorWrapper::getBucketInfo(const spi::Bucket& bucket) const
return checkResult(_impl.getBucketInfo(bucket));
}
-spi::Result
-ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::DocumentSP doc, spi::Context& context)
-{
- return checkResult(_impl.put(bucket, ts, std::move(doc), context));
-}
-
-spi::RemoveResult
-ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context)
-{
- return checkResult(_impl.remove(bucket, ts, docId, context));
-}
-
-spi::RemoveResult
-ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts,
- const document::DocumentId& docId, spi::Context& context)
-{
- return checkResult(_impl.removeIfFound(bucket, ts, docId, context));
-}
-
-spi::UpdateResult
-ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts,
- spi::DocumentUpdateSP docUpdate, spi::Context& context)
-{
- return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context));
-}
-
spi::GetResult
ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
const document::DocumentId& docId, spi::Context& context) const
@@ -130,10 +105,11 @@ ProviderErrorWrapper::createBucket(const spi::Bucket& bucket, spi::Context& cont
return checkResult(_impl.createBucket(bucket, context));
}
-spi::Result
-ProviderErrorWrapper::deleteBucket(const spi::Bucket& bucket, spi::Context& context)
+void
+ProviderErrorWrapper::deleteBucketAsync(const spi::Bucket& bucket, spi::Context& context, spi::OperationComplete::UP onComplete)
{
- return checkResult(_impl.deleteBucket(bucket, context));
+ onComplete->addResultHandler(this);
+ _impl.deleteBucketAsync(bucket, context, std::move(onComplete));
}
spi::BucketIdListResult
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index d23cce9172a..c9d2411e372 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -41,12 +41,8 @@ public:
spi::Result initialize() override;
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::Result setClusterState(BucketSpace bucketSpace, const spi::ClusterState&) override;
- spi::Result setActiveState(const spi::Bucket& bucket, spi::BucketInfo::ActiveState newState) override;
+
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
- spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
- spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult
createIterator(const spi::Bucket &bucket, FieldSetSP, const spi::Selection &, spi::IncludedVersions versions,
@@ -54,7 +50,6 @@ public:
spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override;
spi::Result destroyIterator(spi::IteratorId, spi::Context&) override;
spi::Result createBucket(const spi::Bucket&, spi::Context&) override;
- spi::Result deleteBucket(const spi::Bucket&, spi::Context&) override;
spi::BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
spi::Result split(const spi::Bucket& source, const spi::Bucket& target1, const spi::Bucket& target2, spi::Context&) override;
spi::Result join(const spi::Bucket& source1, const spi::Bucket& source2, const spi::Bucket& target, spi::Context&) override;
@@ -67,6 +62,8 @@ public:
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
+ void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;
+ void deleteBucketAsync(const spi::Bucket&, spi::Context&, spi::OperationComplete::UP) override;
std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<spi::BucketExecutor> executor) override;
private:
template <typename ResultType>
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
index b4c09f09e63..b4fe207e2e5 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.cpp
@@ -43,16 +43,6 @@ getFieldSet(const document::FieldSetRepo & repo, vespalib::stringref name, Messa
return document::FieldSet::SP();
}
-bool
-bucketStatesAreSemanticallyEqual(const api::BucketInfo& a, const api::BucketInfo& b) {
- // Don't check document sizes, as background moving of documents in Proton
- // may trigger a change in size without any mutations taking place. This will
- // only take place when a document being moved was fed _prior_ to the change
- // where Proton starts reporting actual document sizes, and will eventually
- // converge to a stable value. But for now, ignore it to prevent false positive
- // error logs and non-deleted buckets.
- return ((a.getChecksum() == b.getChecksum()) && (a.getDocumentCount() == b.getDocumentCount()));
-}
}
SimpleMessageHandler::SimpleMessageHandler(const PersistenceUtil& env, spi::PersistenceProvider& spi)
: _env(env),
@@ -113,70 +103,6 @@ SimpleMessageHandler::handleCreateBucket(api::CreateBucketCommand& cmd, MessageT
return tracker;
}
-bool
-SimpleMessageHandler::checkProviderBucketInfoMatches(const spi::Bucket& bucket, const api::BucketInfo& info) const
-{
- spi::BucketInfoResult result(_spi.getBucketInfo(bucket));
- if (result.hasError()) {
- LOG(error, "getBucketInfo(%s) failed before deleting bucket; got error '%s'",
- bucket.toString().c_str(), result.getErrorMessage().c_str());
- return false;
- }
- api::BucketInfo providerInfo(PersistenceUtil::convertBucketInfo(result.getBucketInfo()));
- // Don't check meta fields or active/ready fields since these are not
- // that important and ready may change under the hood in a race with
- // getModifiedBuckets(). If bucket is empty it means it has already
- // been deleted by a racing split/join.
- if (!bucketStatesAreSemanticallyEqual(info, providerInfo) && !providerInfo.empty()) {
- LOG(error,
- "Service layer bucket database and provider out of sync before "
- "deleting bucket %s! Service layer db had %s while provider says "
- "bucket has %s. Deletion has been rejected to ensure data is not "
- "lost, but bucket may remain out of sync until service has been "
- "restarted.",
- bucket.toString().c_str(), info.toString().c_str(), providerInfo.toString().c_str());
- return false;
- }
- return true;
-}
-
-MessageTracker::UP
-SimpleMessageHandler::handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.deleteBuckets);
- LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str());
- if (_env._fileStorHandler.isMerging(cmd.getBucket())) {
- _env._fileStorHandler.clearMergeStatus(cmd.getBucket(),
- api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge"));
- }
- spi::Bucket bucket(cmd.getBucket());
- if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) {
- return tracker;
- }
- _spi.deleteBucket(bucket, tracker->context());
- StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace()));
- {
- StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket"));
- if (entry.exist() && entry->getMetaCount() > 0) {
- LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely "
- "active operation when delete bucket was queued. "
- "Updating bucket database to keep it in sync with file. "
- "Cannot delete bucket from bucket database at this "
- "point, as it can have been intentionally recreated "
- "after delete bucket had been sent",
- cmd.getBucketId().toString().c_str());
- api::BucketInfo info(0, 0, 0);
- // Only set document counts/size; retain ready/active state.
- info.setReady(entry->getBucketInfo().isReady());
- info.setActive(entry->getBucketInfo().isActive());
-
- entry->setBucketInfo(info);
- entry.write();
- }
- }
- return tracker;
-}
-
MessageTracker::UP
SimpleMessageHandler::handleGetIter(GetIterCommand& cmd, MessageTracker::UP tracker) const
{
diff --git a/storage/src/vespa/storage/persistence/simplemessagehandler.h b/storage/src/vespa/storage/persistence/simplemessagehandler.h
index 9f00f67684d..2cfbc7016c0 100644
--- a/storage/src/vespa/storage/persistence/simplemessagehandler.h
+++ b/storage/src/vespa/storage/persistence/simplemessagehandler.h
@@ -23,13 +23,11 @@ public:
MessageTrackerUP handleGet(api::GetCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRevert(api::RevertCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateBucket(api::CreateBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleDeleteBucket(api::DeleteBucketCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleCreateIterator(CreateIteratorCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleGetIter(GetIterCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketList(ReadBucketList& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleReadBucketInfo(ReadBucketInfo& cmd, MessageTrackerUP tracker) const;
private:
- bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
const PersistenceUtil & _env;
spi::PersistenceProvider & _spi;
};
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
index 0856f45c3ff..d5b44cc1911 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "splitbitdetector.h"
#include "messages.h"
-#include <vespa/storage/common/bucketmessages.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/storageapi/message/bucket.h>
@@ -145,37 +144,6 @@ SplitJoinHandler::handleSplitBucket(api::SplitBucketCommand& cmd, MessageTracker
}
MessageTracker::UP
-SplitJoinHandler::handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTracker::UP tracker) const
-{
- tracker->setMetric(_env._metrics.setBucketStates);
- NotificationGuard notifyGuard(_bucketOwnershipNotifier);
-
- LOG(debug, "handleSetBucketState(): %s", cmd.toString().c_str());
- spi::Bucket bucket(cmd.getBucket());
- bool shouldBeActive(cmd.getState() == api::SetBucketStateCommand::ACTIVE);
- spi::BucketInfo::ActiveState newState(shouldBeActive ? spi::BucketInfo::ACTIVE : spi::BucketInfo::NOT_ACTIVE);
-
- spi::Result result(_spi.setActiveState(bucket, newState));
- if (tracker->checkForError(result)) {
- StorBucketDatabase::WrappedEntry
- entry = _env.getBucketDatabase(bucket.getBucket().getBucketSpace()).get(cmd.getBucketId(), "handleSetBucketState");
- if (entry.exist()) {
- entry->info.setActive(newState == spi::BucketInfo::ACTIVE);
- notifyGuard.notifyIfOwnershipChanged(cmd.getBucket(), cmd.getSourceIndex(), entry->info);
- entry.write();
- } else {
- LOG(warning, "Got OK setCurrentState result from provider for %s, "
- "but bucket has disappeared from service layer database",
- cmd.getBucketId().toString().c_str());
- }
-
- tracker->setReply(std::make_shared<api::SetBucketStateReply>(cmd));
- }
-
- return tracker;
-}
-
-MessageTracker::UP
SplitJoinHandler::handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTracker::UP tracker) const
{
tracker->setMetric(_env._metrics.recheckBucketInfo);
diff --git a/storage/src/vespa/storage/persistence/splitjoinhandler.h b/storage/src/vespa/storage/persistence/splitjoinhandler.h
index ddfa22b154c..4521e520ee9 100644
--- a/storage/src/vespa/storage/persistence/splitjoinhandler.h
+++ b/storage/src/vespa/storage/persistence/splitjoinhandler.h
@@ -21,7 +21,6 @@ public:
SplitJoinHandler(PersistenceUtil &, spi::PersistenceProvider &,
BucketOwnershipNotifier &, bool enableMultibitSplitOptimalization);
MessageTrackerUP handleSplitBucket(api::SplitBucketCommand& cmd, MessageTrackerUP tracker) const;
- MessageTrackerUP handleSetBucketState(api::SetBucketStateCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd, MessageTrackerUP tracker) const;
MessageTrackerUP handleJoinBuckets(api::JoinBucketsCommand& cmd, MessageTrackerUP tracker) const;
private:
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index fa9ab22c1cb..a17c77f6ca4 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -72,6 +72,7 @@ MergeThrottler::Metrics::Metrics(metrics::MetricSet* owner)
: metrics::MetricSet("mergethrottler", {}, "", owner),
averageQueueWaitingTime("averagequeuewaitingtime", {}, "Average time a merge spends in the throttler queue", this),
queueSize("queuesize", {}, "Length of merge queue", this),
+ active_window_size("active_window_size", {}, "Number of merges active within the pending window size", this),
bounced_due_to_back_pressure("bounced_due_to_back_pressure", {}, "Number of merges bounced due to resource exhaustion back-pressure", this),
chaining("mergechains", this),
local("locallyexecutedmerges", this)
@@ -366,6 +367,7 @@ MergeThrottler::removeActiveMerge(ActiveMergeMap::iterator mergeIter)
LOG(debug, "Removed merge for %s from internal state",
mergeIter->first.toString().c_str());
_merges.erase(mergeIter);
+ update_active_merge_window_size_metric();
}
api::StorageMessage::SP
@@ -815,6 +817,7 @@ MergeThrottler::processNewMergeCommand(
// merge throttling window.
assert(_merges.find(mergeCmd.getBucket()) == _merges.end());
auto state = _merges.emplace(mergeCmd.getBucket(), ChainedMergeState(msg)).first;
+ update_active_merge_window_size_metric();
LOG(debug, "Added merge %s to internal state",
mergeCmd.toString().c_str());
@@ -1247,6 +1250,11 @@ MergeThrottler::set_disable_queue_limits_for_chained_merges(bool disable_limits)
}
void
+MergeThrottler::update_active_merge_window_size_metric() noexcept {
+ _metrics->active_window_size.set(static_cast<int64_t>(_merges.size()));
+}
+
+void
MergeThrottler::print(std::ostream& out, bool /*verbose*/,
const std::string& /*indent*/) const
{
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 9b0fb125b2f..997477a4b70 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -64,6 +64,7 @@ public:
public:
metrics::DoubleAverageMetric averageQueueWaitingTime;
metrics::LongValueMetric queueSize;
+ metrics::LongValueMetric active_window_size;
metrics::LongCountMetric bounced_due_to_back_pressure;
MergeOperationMetrics chaining;
MergeOperationMetrics local;
@@ -388,6 +389,8 @@ private:
void rejectOperationsInThreadQueue(MessageGuard&, uint32_t minimumStateVersion);
void markActiveMergesAsAborted(uint32_t minimumStateVersion);
+ void update_active_merge_window_size_metric() noexcept;
+
// const function, but metrics are mutable
void updateOperationMetrics(
const api::ReturnCode& result,