diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-11-21 22:13:54 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-11-21 22:13:54 +0000 |
commit | 600a2b165718ec0246b524a0d0700e6801410e9d (patch) | |
tree | 38a631c4d02311f27252de46b572b6700f19201a /storage | |
parent | 1871a395a83fa7a08c373ba80fcc20eb76ad775a (diff) |
Defer GC bucket info merge until all responses have been received
Avoids causing false positives in the merge pending metrics due to
partial (and mismatching) bucket info being merged into the DB one
by one as responses are received. Instead, wait until all responses
have been received and merge as one atomic operation.
This fixes #11373
Diffstat (limited to 'storage')
3 files changed, 92 insertions, 46 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 122a1452632..88281f3ca20 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -16,49 +16,89 @@ namespace storage::distributor { struct GarbageCollectionOperationTest : Test, DistributorTestUtil { void SetUp() override { createLinks(); + enableDistributorClusterState("distributor:1 storage:2"); + addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); + getConfig().setGarbageCollection("music.date < 34", 3600); + getClock().setAbsoluteTimeInSeconds(34); }; void TearDown() override { close(); } -}; -TEST_F(GarbageCollectionOperationTest, simple) { - enableDistributorClusterState("distributor:1 storage:2"); - addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); - getConfig().setGarbageCollection("music.date < 34", 3600); + std::shared_ptr<GarbageCollectionOperation> create_op() { + auto op = std::make_shared<GarbageCollectionOperation>( + "storage",BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(0, 1))); + op->setIdealStateManager(&getIdealStateManager()); + return op; + } - GarbageCollectionOperation op("storage", - BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), - toVector<uint16_t>(0, 1))); + // FIXME fragile to assume that send order == node index, but that's the way it currently works + void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) { + auto msg = _sender.command(n); + assert(msg->getType() == api::MessageType::REMOVELOCATION); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); + auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply); + gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500)); - op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender, framework::MilliSecTime(0)); + op.receive(_sender, reply); + } - ASSERT_EQ(2, _sender.commands().size()); + void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) { + BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); + ASSERT_TRUE(entry.valid()); + ASSERT_EQ(entry->getNodeCount(), info.size()); + EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); + for (size_t i = 0; i < info.size(); ++i) { + EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo()) + << "Mismatching info for node " << i << ": " << info[i] << " vs " + << entry->getNode(i)->getBucketInfo(); + } + } +}; - getClock().setAbsoluteTimeInSeconds(34); +TEST_F(GarbageCollectionOperationTest, simple) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + + ASSERT_EQ(2, _sender.commands().size()); for (uint32_t i = 0; i < 2; ++i) { std::shared_ptr<api::StorageCommand> msg = _sender.command(i); ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION); - auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg); EXPECT_EQ("music.date < 34", tmp.getDocumentSelection()); + reply_to_nth_request(*op, i, 777 + i); + } + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34)); +} - std::shared_ptr<api::StorageReply> reply(tmp.makeReply()); - auto& sreply = dynamic_cast<api::RemoveLocationReply&>(*reply); - sreply.setBucketInfo(api::BucketInfo(666, 90, 500)); +TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); - op.receive(_sender, reply); - } + // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet + reply_to_nth_request(*op, 0, 1234); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); + + // Respond to 2nd request. This _should_ cause bucket info to be merged into the database. + reply_to_nth_request(*op, 1, 4567); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34)); +} + +TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); - BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); - ASSERT_TRUE(entry.valid()); - ASSERT_EQ(2, entry->getNodeCount()); - EXPECT_EQ(34, entry->getLastGarbageCollectionTime()); - EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(0).getBucketInfo()); - EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(1).getBucketInfo()); + reply_to_nth_request(*op, 0, 1234); + // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost. + insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000); + reply_to_nth_request(*op, 1, 4567); + // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op. + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34)); } } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 53d9cc018f9..94151a13b7d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -7,7 +7,6 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/log/log.h> - LOG_SETUP(".distributor.operation.idealstate.remove"); using namespace storage::distributor; @@ -17,7 +16,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const std::string& cluste _tracker(clusterName) {} -GarbageCollectionOperation::~GarbageCollectionOperation() { } +GarbageCollectionOperation::~GarbageCollectionOperation() = default; void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) @@ -25,14 +24,13 @@ GarbageCollectionOperation::onStart(DistributorMessageSender& sender) BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); std::vector<uint16_t> nodes = entry->getNodes(); - for (uint32_t i = 0; i < nodes.size(); i++) { - std::shared_ptr<api::RemoveLocationCommand> command( - new api::RemoveLocationCommand( - _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(), - getBucket())); + for (uint32_t node : nodes) { + auto command = std::make_shared<api::RemoveLocationCommand>( + _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(), + getBucket()); command->setPriority(_priority); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(command, node); } _tracker.flushQueue(sender); @@ -46,35 +44,37 @@ void GarbageCollectionOperation::onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>& reply) { - api::RemoveLocationReply* rep = - dynamic_cast<api::RemoveLocationReply*>(reply.get()); + auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get()); + assert(rep != nullptr); uint16_t node = _tracker.handleReply(*rep); if (!rep->getResult().failed()) { - _manager->getDistributorComponent().updateBucketDatabase( - getBucket(), - BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(), - node, - rep->getBucketInfo())); + _replica_info.emplace_back(_manager->getDistributorComponent().getUniqueTimestamp(), + node, rep->getBucketInfo()); } else { _ok = false; } if (_tracker.finished()) { if (_ok) { - BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); - if (dbentry.valid()) { - dbentry->setLastGarbageCollectionTime( - _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime()); - _bucketSpace->getBucketDatabase().update(dbentry); - } + merge_received_bucket_info_into_db(); } - done(); } } +void GarbageCollectionOperation::merge_received_bucket_info_into_db() { + // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. + _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info); + BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); + if (dbentry.valid()) { + dbentry->setLastGarbageCollectionTime( + _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime()); + _bucketSpace->getBucketDatabase().update(dbentry); + } +} + bool GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 88c2615a780..47ea11bb328 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -2,7 +2,9 @@ #pragma once #include "idealstateoperation.h" +#include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> +#include <vector> namespace storage::distributor { @@ -22,6 +24,10 @@ public: protected: MessageTracker _tracker; +private: + std::vector<BucketCopy> _replica_info; + + void merge_received_bucket_info_into_db(); }; } |