diff options
3 files changed, 92 insertions, 46 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 122a1452632..88281f3ca20 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -16,49 +16,89 @@ namespace storage::distributor { struct GarbageCollectionOperationTest : Test, DistributorTestUtil { void SetUp() override { createLinks(); + enableDistributorClusterState("distributor:1 storage:2"); + addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); + getConfig().setGarbageCollection("music.date < 34", 3600); + getClock().setAbsoluteTimeInSeconds(34); }; void TearDown() override { close(); } -}; -TEST_F(GarbageCollectionOperationTest, simple) { - enableDistributorClusterState("distributor:1 storage:2"); - addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300"); - getConfig().setGarbageCollection("music.date < 34", 3600); + std::shared_ptr<GarbageCollectionOperation> create_op() { + auto op = std::make_shared<GarbageCollectionOperation>( + "storage",BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), + toVector<uint16_t>(0, 1))); + op->setIdealStateManager(&getIdealStateManager()); + return op; + } - GarbageCollectionOperation op("storage", - BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), - toVector<uint16_t>(0, 1))); + // FIXME fragile to assume that send order == node index, but that's the way it currently works + void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) { + auto msg = _sender.command(n); + assert(msg->getType() == api::MessageType::REMOVELOCATION); + std::shared_ptr<api::StorageReply> reply(msg->makeReply()); + auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply); + gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500)); - op.setIdealStateManager(&getIdealStateManager()); - op.start(_sender, framework::MilliSecTime(0)); + op.receive(_sender, reply); + } - ASSERT_EQ(2, _sender.commands().size()); + void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) { + BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); + ASSERT_TRUE(entry.valid()); + ASSERT_EQ(entry->getNodeCount(), info.size()); + EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); + for (size_t i = 0; i < info.size(); ++i) { + EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo()) + << "Mismatching info for node " << i << ": " << info[i] << " vs " + << entry->getNode(i)->getBucketInfo(); + } + } +}; - getClock().setAbsoluteTimeInSeconds(34); +TEST_F(GarbageCollectionOperationTest, simple) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + + ASSERT_EQ(2, _sender.commands().size()); for (uint32_t i = 0; i < 2; ++i) { std::shared_ptr<api::StorageCommand> msg = _sender.command(i); ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION); - auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg); EXPECT_EQ("music.date < 34", tmp.getDocumentSelection()); + reply_to_nth_request(*op, i, 777 + i); + } + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34)); +} - std::shared_ptr<api::StorageReply> reply(tmp.makeReply()); - auto& sreply = dynamic_cast<api::RemoveLocationReply&>(*reply); - sreply.setBucketInfo(api::BucketInfo(666, 90, 500)); +TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); - op.receive(_sender, reply); - } + // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet + reply_to_nth_request(*op, 0, 1234); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); + + // Respond to 2nd request. This _should_ cause bucket info to be merged into the database. + reply_to_nth_request(*op, 1, 4567); + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34)); +} + +TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { + auto op = create_op(); + op->start(_sender, framework::MilliSecTime(0)); + ASSERT_EQ(2, _sender.commands().size()); - BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1)); - ASSERT_TRUE(entry.valid()); - ASSERT_EQ(2, entry->getNodeCount()); - EXPECT_EQ(34, entry->getLastGarbageCollectionTime()); - EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(0).getBucketInfo()); - EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(1).getBucketInfo()); + reply_to_nth_request(*op, 0, 1234); + // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost. + insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000); + reply_to_nth_request(*op, 1, 4567); + // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op. + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34)); } } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 53d9cc018f9..94151a13b7d 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -7,7 +7,6 @@ #include <vespa/storageapi/message/removelocation.h> #include <vespa/log/log.h> - LOG_SETUP(".distributor.operation.idealstate.remove"); using namespace storage::distributor; @@ -17,7 +16,7 @@ GarbageCollectionOperation::GarbageCollectionOperation(const std::string& cluste _tracker(clusterName) {} -GarbageCollectionOperation::~GarbageCollectionOperation() { } +GarbageCollectionOperation::~GarbageCollectionOperation() = default; void GarbageCollectionOperation::onStart(DistributorMessageSender& sender) @@ -25,14 +24,13 @@ GarbageCollectionOperation::onStart(DistributorMessageSender& sender) BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId()); std::vector<uint16_t> nodes = entry->getNodes(); - for (uint32_t i = 0; i < nodes.size(); i++) { - std::shared_ptr<api::RemoveLocationCommand> command( - new api::RemoveLocationCommand( - _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(), - getBucket())); + for (uint32_t node : nodes) { + auto command = std::make_shared<api::RemoveLocationCommand>( + _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(), + getBucket()); command->setPriority(_priority); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(command, node); } _tracker.flushQueue(sender); @@ -46,35 +44,37 @@ void GarbageCollectionOperation::onReceive(DistributorMessageSender&, const std::shared_ptr<api::StorageReply>& reply) { - api::RemoveLocationReply* rep = - dynamic_cast<api::RemoveLocationReply*>(reply.get()); + auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get()); + assert(rep != nullptr); uint16_t node = _tracker.handleReply(*rep); if (!rep->getResult().failed()) { - _manager->getDistributorComponent().updateBucketDatabase( - getBucket(), - BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(), - node, - rep->getBucketInfo())); + _replica_info.emplace_back(_manager->getDistributorComponent().getUniqueTimestamp(), + node, rep->getBucketInfo()); } else { _ok = false; } if (_tracker.finished()) { if (_ok) { - BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); - if (dbentry.valid()) { - dbentry->setLastGarbageCollectionTime( - _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime()); - _bucketSpace->getBucketDatabase().update(dbentry); - } + merge_received_bucket_info_into_db(); } - done(); } } +void GarbageCollectionOperation::merge_received_bucket_info_into_db() { + // TODO avoid two separate DB ops for this. Current API currently does not make this elegant. + _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info); + BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId()); + if (dbentry.valid()) { + dbentry->setLastGarbageCollectionTime( + _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime()); + _bucketSpace->getBucketDatabase().update(dbentry); + } +} + bool GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h index 88c2615a780..47ea11bb328 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h @@ -2,7 +2,9 @@ #pragma once #include "idealstateoperation.h" +#include <vespa/storage/bucketdb/bucketcopy.h> #include <vespa/storage/distributor/messagetracker.h> +#include <vector> namespace storage::distributor { @@ -22,6 +24,10 @@ public: protected: MessageTracker _tracker; +private: + std::vector<BucketCopy> _replica_info; + + void merge_received_bucket_info_into_db(); }; } |