aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
blob: 710da80972f06cbf7d8b8977e4014db95e3e8178 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <tests/persistence/common/filestortestfixture.h>
#include <tests/persistence/common/persistenceproviderwrapper.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/storageapi/message/bucket.h>

using storage::spi::test::makeSpiBucket;
using namespace ::testing;

namespace storage {

/**
 * Effectively an integration test between the ModifiedBucketChecker storage
 * link and the behavior of the filestor component.
 */
struct FileStorModifiedBucketsTest : FileStorTestFixture {
    void modifyBuckets(uint32_t first, uint32_t count);

    spi::dummy::DummyPersistence& getDummyPersistence() {
        return dynamic_cast<spi::dummy::DummyPersistence&>(_node->getPersistenceProvider());
    }
};

namespace {

struct BucketCheckerInjector : FileStorTestFixture::StorageLinkInjector
{
    TestServiceLayerApp& _node;
    FileStorTestFixture& _fixture;
    BucketCheckerInjector(TestServiceLayerApp& node,
                          FileStorTestFixture& fixture)
        : _node(node),
          _fixture(fixture)
    {}
    void inject(DummyStorageLink& link) const override {
        using vespa::config::content::core::StorServerConfig;
        auto cfg = config_from<StorServerConfig>(config::ConfigUri(_fixture._config->getConfigId()));
       link.push_back(std::make_unique<ModifiedBucketChecker>(
               _node.getComponentRegister(), _node.getPersistenceProvider(), *cfg));
    }
};

void
assertIsNotifyCommandWithActiveBucket(api::StorageMessage& msg)
{
    auto& cmd = dynamic_cast<api::NotifyBucketChangeCommand&>(msg);
    ASSERT_TRUE(cmd.getBucketInfo().isActive());
    ASSERT_EQ(
            vespalib::string("StorageMessageAddress(Storage protocol, "
                             "cluster storage, nodetype distributor, index 0)"),
            cmd.getAddress()->toString());
}

}

void
FileStorModifiedBucketsTest::modifyBuckets(uint32_t first, uint32_t count)
{
    spi::BucketIdListResult::List buckets;
    for (uint32_t i = 0; i < count; ++i) {
        buckets.push_back(document::BucketId(16, first + i));
        _node->getPersistenceProvider().setActiveState(
                makeSpiBucket(buckets[i]),
                spi::BucketInfo::ACTIVE);
    }

    getDummyPersistence().setModifiedBuckets(std::move(buckets));
}

TEST_F(FileStorModifiedBucketsTest, modified_buckets_send_notify_bucket_change) {
    BucketCheckerInjector bcj(*_node, *this);
    TestFileStorComponents c(*this, bcj);
    setClusterState("storage:1 distributor:1");

    uint32_t numBuckets = 10;

    for (uint32_t i = 0; i < numBuckets; ++i) {
        document::BucketId bucket(16, i);
        createBucket(makeSpiBucket(bucket));
        c.sendPut(bucket, DocumentIndex(0), PutTimestamp(1000));
    }
    c.top.waitForMessages(numBuckets, MSG_WAIT_TIME);
    c.top.reset();

    modifyBuckets(0, numBuckets);
    c.top.waitForMessages(numBuckets, MSG_WAIT_TIME);

    for (uint32_t i = 0; i < 10; ++i) {
        ASSERT_NO_FATAL_FAILURE(assertIsNotifyCommandWithActiveBucket(*c.top.getReply(i)));

        StorBucketDatabase::WrappedEntry entry(
                _node->getStorageBucketDatabase().get(
                        document::BucketId(16, i), "foo"));

        EXPECT_TRUE(entry->info.isActive());
    }
}

TEST_F(FileStorModifiedBucketsTest, file_stor_replies_to_recheck_bucket_commands) {
    BucketCheckerInjector bcj(*_node, *this);
    TestFileStorComponents c(*this, bcj);
    setClusterState("storage:1 distributor:1");

    document::BucketId bucket(16, 0);
    createBucket(makeSpiBucket(bucket));
    c.sendPut(bucket, DocumentIndex(0), PutTimestamp(1000));
    c.top.waitForMessages(1, MSG_WAIT_TIME);
    c.top.reset();

    modifyBuckets(0, 1);
    c.top.waitForMessages(1, MSG_WAIT_TIME);
    ASSERT_NO_FATAL_FAILURE(assertIsNotifyCommandWithActiveBucket(*c.top.getReply(0)));

    // If we don't reply to the recheck bucket commands, we won't trigger
    // a new round of getModifiedBuckets and recheck commands.
    c.top.reset();
    createBucket(makeSpiBucket(document::BucketId(16, 1)));
    modifyBuckets(1, 1);
    c.top.waitForMessages(1, MSG_WAIT_TIME);
    ASSERT_NO_FATAL_FAILURE(assertIsNotifyCommandWithActiveBucket(*c.top.getReply(0)));
}

} // storage