aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/tests/proton/documentdb/move_operation_limiter/move_operation_limiter_test.cpp
blob: 62530e9de7b2aa4bdad3b9b7a13abfbd64e39d2d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/searchcore/proton/server/i_blockable_maintenance_job.h>
#include <vespa/searchcore/proton/server/move_operation_limiter.h>
#include <vespa/vespalib/testkit/testapp.h>
#include <queue>

#include <vespa/log/log.h>
LOG_SETUP("move_operation_limiter_test");

using namespace proton;

struct MyBlockableMaintenanceJob : public IBlockableMaintenanceJob {
    bool blocked;
    MyBlockableMaintenanceJob()
        : IBlockableMaintenanceJob("my_job", 1s, 1s),
          blocked(false)
    {}
    void setBlocked(BlockedReason reason) override {
        ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS);
        EXPECT_FALSE(blocked);
        blocked = true;
    }
    void unBlock(BlockedReason reason) override {
        ASSERT_TRUE(reason == BlockedReason::OUTSTANDING_OPS);
        EXPECT_TRUE(blocked);
        blocked = false;
    }
    bool run() override { return true; }
    void onStop() override { }
};

struct Fixture {
    using OpsQueue = std::queue<std::shared_ptr<vespalib::IDestructorCallback>>;
    using MoveOperationLimiterSP = std::shared_ptr<MoveOperationLimiter>;

    MyBlockableMaintenanceJob job;
    MoveOperationLimiterSP limiter;
    OpsQueue ops;
    Fixture(uint32_t maxOutstandingOps = 2)
        : job(),
          limiter(std::make_shared<MoveOperationLimiter>(&job, maxOutstandingOps)),
          ops()
    {}
    void beginOp() { ops.push(limiter->beginOperation()); }
    void endOp() { ops.pop(); }
    void clearJob() { limiter->clearJob(); }
    void clearLimiter() { limiter = MoveOperationLimiterSP(); }
    void assertAboveLimit() const {
        EXPECT_TRUE(limiter->isAboveLimit());
        EXPECT_TRUE(job.blocked);
    }
    void assertBelowLimit() const {
        EXPECT_FALSE(limiter->isAboveLimit());
        EXPECT_FALSE(job.blocked);
    }
};

TEST_F("require that hasPending reflects if any jobs are outstanding", Fixture)
{
    EXPECT_FALSE(f.limiter->hasPending());
    f.beginOp();
    EXPECT_TRUE(f.limiter->hasPending());
    f.endOp();
    EXPECT_FALSE(f.limiter->hasPending());
}

TEST_F("require that job is blocked / unblocked when crossing max outstanding ops boundaries", Fixture)
{
    f.beginOp();
    TEST_DO(f.assertBelowLimit());
    f.beginOp();
    TEST_DO(f.assertAboveLimit());
    f.beginOp();
    TEST_DO(f.assertAboveLimit());
    f.endOp();
    TEST_DO(f.assertAboveLimit());
    f.endOp();
    TEST_DO(f.assertBelowLimit());
    f.endOp();
    TEST_DO(f.assertBelowLimit());
}

TEST_F("require that cleared job is not blocked when crossing max ops boundary", Fixture)
{
    f.beginOp();
    f.clearJob();
    f.beginOp();
    EXPECT_FALSE(f.job.blocked);
    EXPECT_TRUE(f.limiter->isAboveLimit());
}

TEST_F("require that cleared job is not unblocked when crossing max ops boundary", Fixture)
{
    f.beginOp();
    f.beginOp();
    TEST_DO(f.assertAboveLimit());
    f.clearJob();
    f.endOp();
    EXPECT_TRUE(f.job.blocked);
    EXPECT_FALSE(f.limiter->isAboveLimit());
}

TEST_F("require that destructor callback has reference to limiter via shared ptr", Fixture)
{
    f.beginOp();
    f.beginOp();
    TEST_DO(f.assertAboveLimit());
    f.clearLimiter();
    f.endOp();
    EXPECT_FALSE(f.job.blocked);
}

TEST_MAIN()
{
    TEST_RUN_ALL();
}