summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-21 22:13:54 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-21 22:13:54 +0000
commit600a2b165718ec0246b524a0d0700e6801410e9d (patch)
tree38a631c4d02311f27252de46b572b6700f19201a /storage
parent1871a395a83fa7a08c373ba80fcc20eb76ad775a (diff)
Defer GC bucket info merge until all responses have been received
Avoids causing false positives in the merge pending metrics due to partial (and mismatching) bucket info being merged into the DB one by one as responses are received. Instead, wait until all responses have been received and merge as one atomic operation. This fixes #11373
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp88
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp44
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h6
3 files changed, 92 insertions, 46 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 122a1452632..88281f3ca20 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -16,49 +16,89 @@ namespace storage::distributor {
struct GarbageCollectionOperationTest : Test, DistributorTestUtil {
void SetUp() override {
createLinks();
+ enableDistributorClusterState("distributor:1 storage:2");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
+ getConfig().setGarbageCollection("music.date < 34", 3600);
+ getClock().setAbsoluteTimeInSeconds(34);
};
void TearDown() override {
close();
}
-};
-TEST_F(GarbageCollectionOperationTest, simple) {
- enableDistributorClusterState("distributor:1 storage:2");
- addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
- getConfig().setGarbageCollection("music.date < 34", 3600);
+ std::shared_ptr<GarbageCollectionOperation> create_op() {
+ auto op = std::make_shared<GarbageCollectionOperation>(
+ "storage",BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(0, 1)));
+ op->setIdealStateManager(&getIdealStateManager());
+ return op;
+ }
- GarbageCollectionOperation op("storage",
- BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(0, 1)));
+ // FIXME fragile to assume that send order == node index, but that's the way it currently works
+ void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) {
+ auto msg = _sender.command(n);
+ assert(msg->getType() == api::MessageType::REMOVELOCATION);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply);
+ gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500));
- op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.receive(_sender, reply);
+ }
- ASSERT_EQ(2, _sender.commands().size());
+ void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) {
+ BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
+ ASSERT_TRUE(entry.valid());
+ ASSERT_EQ(entry->getNodeCount(), info.size());
+ EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
+ for (size_t i = 0; i < info.size(); ++i) {
+ EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
+ << "Mismatching info for node " << i << ": " << info[i] << " vs "
+ << entry->getNode(i)->getBucketInfo();
+ }
+ }
+};
- getClock().setAbsoluteTimeInSeconds(34);
+TEST_F(GarbageCollectionOperationTest, simple) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ(2, _sender.commands().size());
for (uint32_t i = 0; i < 2; ++i) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(i);
ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION);
-
auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg);
EXPECT_EQ("music.date < 34", tmp.getDocumentSelection());
+ reply_to_nth_request(*op, i, 777 + i);
+ }
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34));
+}
- std::shared_ptr<api::StorageReply> reply(tmp.makeReply());
- auto& sreply = dynamic_cast<api::RemoveLocationReply&>(*reply);
- sreply.setBucketInfo(api::BucketInfo(666, 90, 500));
+TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
- op.receive(_sender, reply);
- }
+ // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet
+ reply_to_nth_request(*op, 0, 1234);
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+
+ // Respond to 2nd request. This _should_ cause bucket info to be merged into the database.
+ reply_to_nth_request(*op, 1, 4567);
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
- BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
- ASSERT_TRUE(entry.valid());
- ASSERT_EQ(2, entry->getNodeCount());
- EXPECT_EQ(34, entry->getLastGarbageCollectionTime());
- EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(0).getBucketInfo());
- EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(1).getBucketInfo());
+ reply_to_nth_request(*op, 0, 1234);
+ // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost.
+ insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000);
+ reply_to_nth_request(*op, 1, 4567);
+ // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34));
}
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 53d9cc018f9..94151a13b7d 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -7,7 +7,6 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/log/log.h>
-
LOG_SETUP(".distributor.operation.idealstate.remove");
using namespace storage::distributor;
@@ -17,7 +16,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const std::string& cluste
_tracker(clusterName)
{}
-GarbageCollectionOperation::~GarbageCollectionOperation() { }
+GarbageCollectionOperation::~GarbageCollectionOperation() = default;
void
GarbageCollectionOperation::onStart(DistributorMessageSender& sender)
@@ -25,14 +24,13 @@ GarbageCollectionOperation::onStart(DistributorMessageSender& sender)
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
- for (uint32_t i = 0; i < nodes.size(); i++) {
- std::shared_ptr<api::RemoveLocationCommand> command(
- new api::RemoveLocationCommand(
- _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(),
- getBucket()));
+ for (uint32_t node : nodes) {
+ auto command = std::make_shared<api::RemoveLocationCommand>(
+ _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(),
+ getBucket());
command->setPriority(_priority);
- _tracker.queueCommand(command, nodes[i]);
+ _tracker.queueCommand(command, node);
}
_tracker.flushQueue(sender);
@@ -46,35 +44,37 @@ void
GarbageCollectionOperation::onReceive(DistributorMessageSender&,
const std::shared_ptr<api::StorageReply>& reply)
{
- api::RemoveLocationReply* rep =
- dynamic_cast<api::RemoveLocationReply*>(reply.get());
+ auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
+ assert(rep != nullptr);
uint16_t node = _tracker.handleReply(*rep);
if (!rep->getResult().failed()) {
- _manager->getDistributorComponent().updateBucketDatabase(
- getBucket(),
- BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(),
- node,
- rep->getBucketInfo()));
+ _replica_info.emplace_back(_manager->getDistributorComponent().getUniqueTimestamp(),
+ node, rep->getBucketInfo());
} else {
_ok = false;
}
if (_tracker.finished()) {
if (_ok) {
- BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
- if (dbentry.valid()) {
- dbentry->setLastGarbageCollectionTime(
- _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime());
- _bucketSpace->getBucketDatabase().update(dbentry);
- }
+ merge_received_bucket_info_into_db();
}
-
done();
}
}
+void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info);
+ BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
+ if (dbentry.valid()) {
+ dbentry->setLastGarbageCollectionTime(
+ _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime());
+ _bucketSpace->getBucketDatabase().update(dbentry);
+ }
+}
+
bool
GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 88c2615a780..47ea11bb328 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -2,7 +2,9 @@
#pragma once
#include "idealstateoperation.h"
+#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
+#include <vector>
namespace storage::distributor {
@@ -22,6 +24,10 @@ public:
protected:
MessageTracker _tracker;
+private:
+ std::vector<BucketCopy> _replica_info;
+
+ void merge_received_bucket_info_into_db();
};
}