summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/garbagecollectiontest.cpp
blob: 65c1ac726b51b7594b4fbfe8f105da3a780f75b5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/storageapi/message/removelocation.h>
#include <vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.h>
#include <vespa/storage/distributor/idealstatemanager.h>
#include <tests/distributor/distributortestutil.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/vespalib/gtest/gtest.h>

using document::test::makeDocumentBucket;
using namespace ::testing;

namespace storage::distributor {

struct GarbageCollectionOperationTest : Test, DistributorTestUtil {
    void SetUp() override {
        createLinks();
        enableDistributorClusterState("distributor:1 storage:2");
        addNodesToBucketDB(document::BucketId(16, 1), "0=250/50/300,1=250/50/300");
        getConfig().setGarbageCollection("music.date < 34", 3600s);
        getClock().setAbsoluteTimeInSeconds(34);
    };

    void TearDown() override {
        close();
    }

    std::shared_ptr<GarbageCollectionOperation> create_op() {
        auto op = std::make_shared<GarbageCollectionOperation>(
                "storage",BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)),
                                                            toVector<uint16_t>(0, 1)));
        op->setIdealStateManager(&getIdealStateManager());
        return op;
    }

    // FIXME fragile to assume that send order == node index, but that's the way it currently works
    void reply_to_nth_request(GarbageCollectionOperation& op, size_t n, uint32_t bucket_info_checksum) {
        auto msg = _sender.command(n);
        assert(msg->getType() == api::MessageType::REMOVELOCATION);
        std::shared_ptr<api::StorageReply> reply(msg->makeReply());
        auto& gc_reply = dynamic_cast<api::RemoveLocationReply&>(*reply);
        gc_reply.setBucketInfo(api::BucketInfo(bucket_info_checksum, 90, 500));

        op.receive(_sender, reply);
    }

    void assert_bucket_db_contains(std::vector<api::BucketInfo> info, uint32_t last_gc_time) {
        BucketDatabase::Entry entry = getBucket(document::BucketId(16, 1));
        ASSERT_TRUE(entry.valid());
        ASSERT_EQ(entry->getNodeCount(), info.size());
        EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time);
        for (size_t i = 0; i < info.size(); ++i) {
            EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo())
                    << "Mismatching info for node " << i << ": " << info[i] << " vs "
                    << entry->getNode(i)->getBucketInfo();
        }
    }
};

TEST_F(GarbageCollectionOperationTest, simple) {
    auto op = create_op();
    op->start(_sender, framework::MilliSecTime(0));

    ASSERT_EQ(2, _sender.commands().size());

    for (uint32_t i = 0; i < 2; ++i) {
        std::shared_ptr<api::StorageCommand> msg = _sender.command(i);
        ASSERT_EQ(msg->getType(), api::MessageType::REMOVELOCATION);
        auto& tmp = dynamic_cast<api::RemoveLocationCommand&>(*msg);
        EXPECT_EQ("music.date < 34", tmp.getDocumentSelection());
        reply_to_nth_request(*op, i, 777 + i);
    }
    ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(777, 90, 500), api::BucketInfo(778, 90, 500)}, 34));
}

TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until_all_replies_received) {
    auto op = create_op();
    op->start(_sender, framework::MilliSecTime(0));
    ASSERT_EQ(2, _sender.commands().size());

    // Respond to 1st request. Should _not_ cause bucket info to be merged into the database yet
    reply_to_nth_request(*op, 0, 1234);
    ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0));

    // Respond to 2nd request. This _should_ cause bucket info to be merged into the database.
    reply_to_nth_request(*op, 1, 4567);
    ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(1234, 90, 500), api::BucketInfo(4567, 90, 500)}, 34));
}

TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) {
    auto op = create_op();
    op->start(_sender, framework::MilliSecTime(0));
    ASSERT_EQ(2, _sender.commands().size());

    reply_to_nth_request(*op, 0, 1234);
    // Change to replica on node 0 happens after GC op, but before GC info is merged into the DB. Must not be lost.
    insertBucketInfo(op->getBucketId(), 0, 7777, 100, 2000);
    reply_to_nth_request(*op, 1, 4567);
    // Bucket info for node 0 is that of the later sequenced operation, _not_ from the earlier GC op.
    ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(7777, 100, 2000), api::BucketInfo(4567, 90, 500)}, 34));
}

} // storage::distributor