summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-11-25 07:45:27 -0500
committerGitHub <noreply@github.com>2019-11-25 07:45:27 -0500
commit9c1321e04b22d2053f4efb6158784624800fbe1b (patch)
tree25798186fa2349d621fd86fb5dbf0676425a5e54 /storage
parenta274eab5b7e5b893110b1e0e64399d667b94004b (diff)
parentc0df2bf04e7c4f33a8ef4ac7f42dec10b9d95d88 (diff)
Merge pull request #11381 from vespa-engine/vekterli/defer-gc-bucket-info-merge-until-all-responses-received
Defer GC bucket info merge until all responses have been received
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/garbagecollectiontest.cpp88
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp47
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h6
3 files changed, 94 insertions, 47 deletions
diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp
index 122a1452632..88281f3ca20 100644
--- a/storage/src/tests/distributor/garbagecollectiontest.cpp
+++ b/storage/src/tests/distributor/garbagecollectiontest.cpp
@@ -16,49 +16,89 @@ namespace storage::distributor {
struct GarbageCollectionOperationTest : Test, DistributorTestUtil {
void SetUp() override {
createLinks();
+ enableDistributorClusterState("distributor:1 storage:2");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
+ getConfig().setGarbageCollection("music.date < 34", 3600);
+ getClock().setAbsoluteTimeInSeconds(34);
};
void TearDown() override {
close();
}
-};
-TEST_F(GarbageCollectionOperationTest, simple) {
- enableDistributorClusterState("distributor:1 storage:2");
- addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
- getConfig().setGarbageCollection("music.date < 34", 3600);
+ std::shared_ptr<GarbageCollectionOperation> create_op() {
+ auto op = std::make_shared<GarbageCollectionOperation>(
+ "storage",BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
+ toVector<uint16_t>(0, 1)));
+ op->setIdealStateManager(&getIdealStateManager());
+ return op;
+ }
- GarbageCollectionOperation op("storage",
- BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
- toVector<uint16_t>(0, 1)));
+ // FIXME fragile to assume that send order == node index, but that's the way it currently works
+ void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) {
+ auto msg = _sender.command(n);
+ assert(msg->getType() == api::MessageType::REMOVELOCATION);
+ std::shared_ptr<api::StorageReply> reply(msg->makeReply());
+ auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply);
+ gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500));
- op.setIdealStateManager(&getIdealStateManager());
- op.start(_sender, framework::MilliSecTime(0));
+ op.receive(_sender, reply);
+ }
- ASSERT_EQ(2, _sender.commands().size());
+ void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) {
+ BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
+ ASSERT_TRUE(entry.valid());
+ ASSERT_EQ(entry->getNodeCount(), info.size());
+ EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
+ for (size_t i = 0; i < info.size(); ++i) {
+ EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
+ << "Mismatching info for node " << i << ": " << info[i] << " vs "
+ << entry->getNode(i)->getBucketInfo();
+ }
+ }
+};
- getClock().setAbsoluteTimeInSeconds(34);
+TEST_F(GarbageCollectionOperationTest, simple) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+
+ ASSERT_EQ(2, _sender.commands().size());
for (uint32_t i = 0; i < 2; ++i) {
std::shared_ptr<api::StorageCommand> msg = _sender.command(i);
ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION);
-
auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg);
EXPECT_EQ("music.date < 34", tmp.getDocumentSelection());
+ reply_to_nth_request(*op, i, 777 + i);
+ }
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34));
+}
- std::shared_ptr<api::StorageReply> reply(tmp.makeReply());
- auto& sreply = dynamic_cast<api::RemoveLocationReply&>(*reply);
- sreply.setBucketInfo(api::BucketInfo(666, 90, 500));
+TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
- op.receive(_sender, reply);
- }
+ // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet
+ reply_to_nth_request(*op, 0, 1234);
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));
+
+ // Respond to 2nd request. This _should_ cause bucket info to be merged into the database.
+ reply_to_nth_request(*op, 1, 4567);
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34));
+}
+
+TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
+ auto op = create_op();
+ op->start(_sender, framework::MilliSecTime(0));
+ ASSERT_EQ(2, _sender.commands().size());
- BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
- ASSERT_TRUE(entry.valid());
- ASSERT_EQ(2, entry->getNodeCount());
- EXPECT_EQ(34, entry->getLastGarbageCollectionTime());
- EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(0).getBucketInfo());
- EXPECT_EQ(api::BucketInfo(666, 90, 500), entry->getNodeRef(1).getBucketInfo());
+ reply_to_nth_request(*op, 0, 1234);
+ // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost.
+ insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000);
+ reply_to_nth_request(*op, 1, 4567);
+ // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op.
+ ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34));
}
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 53d9cc018f9..c674add80f7 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -7,17 +7,17 @@
#include <vespa/storageapi/message/removelocation.h>
#include <vespa/log/log.h>
-
LOG_SETUP(".distributor.operation.idealstate.remove");
using namespace storage::distributor;
GarbageCollectionOperation::GarbageCollectionOperation(const std::string& clusterName, const BucketAndNodes& nodes)
: IdealStateOperation(nodes),
- _tracker(clusterName)
+ _tracker(clusterName),
+ _replica_info()
{}
-GarbageCollectionOperation::~GarbageCollectionOperation() { }
+GarbageCollectionOperation::~GarbageCollectionOperation() = default;
void
GarbageCollectionOperation::onStart(DistributorMessageSender& sender)
@@ -25,14 +25,13 @@ GarbageCollectionOperation::onStart(DistributorMessageSender& sender)
BucketDatabase::Entry entry = _bucketSpace->getBucketDatabase().get(getBucketId());
std::vector<uint16_t> nodes = entry->getNodes();
- for (uint32_t i = 0; i < nodes.size(); i++) {
- std::shared_ptr<api::RemoveLocationCommand> command(
- new api::RemoveLocationCommand(
- _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(),
- getBucket()));
+ for (auto node : nodes) {
+ auto command = std::make_shared<api::RemoveLocationCommand>(
+ _manager->getDistributorComponent().getDistributor().getConfig().getGarbageCollectionSelection(),
+ getBucket());
command->setPriority(_priority);
- _tracker.queueCommand(command, nodes[i]);
+ _tracker.queueCommand(command, node);
}
_tracker.flushQueue(sender);
@@ -46,35 +45,37 @@ void
GarbageCollectionOperation::onReceive(DistributorMessageSender&,
const std::shared_ptr<api::StorageReply>& reply)
{
- api::RemoveLocationReply* rep =
- dynamic_cast<api::RemoveLocationReply*>(reply.get());
+ auto* rep = dynamic_cast<api::RemoveLocationReply*>(reply.get());
+ assert(rep != nullptr);
uint16_t node = _tracker.handleReply(*rep);
if (!rep->getResult().failed()) {
- _manager->getDistributorComponent().updateBucketDatabase(
- getBucket(),
- BucketCopy(_manager->getDistributorComponent().getUniqueTimestamp(),
- node,
- rep->getBucketInfo()));
+ _replica_info.emplace_back(_manager->getDistributorComponent().getUniqueTimestamp(),
+ node, rep->getBucketInfo());
} else {
_ok = false;
}
if (_tracker.finished()) {
if (_ok) {
- BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
- if (dbentry.valid()) {
- dbentry->setLastGarbageCollectionTime(
- _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime());
- _bucketSpace->getBucketDatabase().update(dbentry);
- }
+ merge_received_bucket_info_into_db();
}
-
done();
}
}
+void GarbageCollectionOperation::merge_received_bucket_info_into_db() {
+ // TODO avoid two separate DB ops for this. Current API currently does not make this elegant.
+ _manager->getDistributorComponent().updateBucketDatabase(getBucket(), _replica_info);
+ BucketDatabase::Entry dbentry = _bucketSpace->getBucketDatabase().get(getBucketId());
+ if (dbentry.valid()) {
+ dbentry->setLastGarbageCollectionTime(
+ _manager->getDistributorComponent().getClock().getTimeInSeconds().getTime());
+ _bucketSpace->getBucketDatabase().update(dbentry);
+ }
+}
+
bool
GarbageCollectionOperation::shouldBlockThisOperation(uint32_t, uint8_t) const
{
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
index 88c2615a780..47ea11bb328 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h
@@ -2,7 +2,9 @@
#pragma once
#include "idealstateoperation.h"
+#include <vespa/storage/bucketdb/bucketcopy.h>
#include <vespa/storage/distributor/messagetracker.h>
+#include <vector>
namespace storage::distributor {
@@ -22,6 +24,10 @@ public:
protected:
MessageTracker _tracker;
+private:
+ std::vector<BucketCopy> _replica_info;
+
+ void merge_received_bucket_info_into_db();
};
}