aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
blob: 954162d4d3db5309eae6a5b3729e119417204d82 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;

import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.Reindexing.Trigger;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.schema.derived.Deriver;
import com.yahoo.test.ManualClock;
import com.yahoo.vespa.curator.mock.MockCurator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
 * @author jonmv
 */
class ReindexerTest {

    static Runnable failIfCalled(VisitorParameters ignored) { throw new AssertionError("Not supposed to run"); }

    final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
    final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
    final DocumentType music = manager.getDocumentType("music");
    final Cluster cluster = new Cluster("cluster", Map.of(music, "default"));
    final MockMetric metric = new MockMetric();
    final ManualClock clock = new ManualClock(Instant.EPOCH);

    ReindexingCurator database;

    @BeforeEach
    void setUp() {
        database = new ReindexingCurator(new MockCurator(), manager, Duration.ofMillis(1));
    }

    @Test
    void throwsWhenUnknownBuckets() {
        assertThrows(NullPointerException.class,
                     () -> new Reindexer(new Cluster("cluster", Map.of()), triggers(0), database, ReindexerTest::failIfCalled, metric, clock));
    }

    @Test
    void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
        Reindexer reindexer = new Reindexer(cluster, triggers(0), database, ReindexerTest::failIfCalled, metric, clock);
        Executors.newSingleThreadExecutor().submit(() -> database.lockReindexing("cluster")).get();
        assertThrows(ReindexingLockException.class, reindexer::reindex);
    }

    @Test
    @Timeout(10)
    void nothingToDoWithEmptyConfig() throws ReindexingLockException {
        new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
        assertEquals(Map.of(), metric.metrics());
    }

    @Test
    void testParameters() {
        Reindexer reindexer = new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock);
        ProgressToken token = new ProgressToken();
        VisitorParameters parameters = reindexer.createParameters(music, 0.3, token);
        assertEquals("music:[document]", parameters.getFieldSet());
        assertSame(token, parameters.getResumeToken());
        assertEquals("default", parameters.getBucketSpace());
        assertEquals("[Content:cluster=cluster]", parameters.getRoute().toString());
        assertEquals("cluster", parameters.getRemoteDataHandler());
        assertEquals("music", parameters.getDocumentSelection());
        assertEquals(DocumentProtocol.Priority.NORMAL_3, parameters.getPriority());
        assertEquals(0.3, ((DynamicThrottlePolicy) parameters.getThrottlePolicy()).getWindowSizeIncrement());
    }

    @Test
    @Timeout(10)
    void testReindexing() throws ReindexingLockException {
        // Reindexer is created against en empty database, so any ready document types are assumed already done.
        new Reindexer(cluster, triggers(-10), database, ReindexerTest::failIfCalled, metric, clock).reindex();
        Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
        assertEquals(reindexing, database.readReindexing("cluster"));

        // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
        // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
        clock.advance(Duration.ofMillis(5));
        new Reindexer(cluster, triggers(10), database, ReindexerTest::failIfCalled, metric, clock).reindex();
        assertEquals(reindexing, database.readReindexing("cluster"));

        // It's time to reindex the "music" documents — let this complete successfully.
        clock.advance(Duration.ofMillis(10));
        AtomicBoolean shutDown = new AtomicBoolean();
        Executor executor = Executors.newSingleThreadExecutor();
        new Reindexer(cluster, triggers(10), database, parameters -> {
                    database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
                    executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
                    return () -> shutDown.set(true);
                }, metric, clock).reindex();
        reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
        assertEquals(reindexing, database.readReindexing("cluster"));
        assertTrue(shutDown.get(), "Session was shut down");
        assertEquals(Map.of("reindexing.progress", Map.of(Map.of("documenttype", "music",
                                                                 "clusterid", "cluster"),
                                                          1.0)),
                     metric.metrics());

        // One more reindexing, this time shut down before visit completes, but after progress is reported.
        clock.advance(Duration.ofMillis(10));
        metric.metrics().clear();
        shutDown.set(false);
        AtomicReference<Reindexer> aborted = new AtomicReference<>();
        aborted.set(new Reindexer(cluster, triggers(20), database, parameters -> {
                    database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
                    parameters.getControlHandler().onProgress(new ProgressToken());
                    aborted.get().shutdown();
                    return () -> {
                        shutDown.set(true);
                        parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");
                    };
                }, metric, clock));
        aborted.get().reindex();
        reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
        assertEquals(reindexing, database.readReindexing("cluster"));
        assertTrue(shutDown.get(), "Session was shut down");
        assertEquals(1.0, // new ProgressToken() is 100% done.
                     metric.metrics().get("reindexing.progress")
                           .get(Map.of("documenttype", "music",
                                       "clusterid", "cluster")));

        // Reindexer is created without any ready document types, which means nothing should run.
        new Reindexer(cluster, triggers(), database, ReindexerTest::failIfCalled, metric, clock).reindex();
        assertEquals(reindexing, database.readReindexing("cluster"));

        // Last reindexing fails.
        clock.advance(Duration.ofMillis(10));
        shutDown.set(false);
        new Reindexer(cluster, triggers(30), database, parameters -> {
                    database.writeReindexing(Reindexing.empty(), "cluster"); // Wipe database to verify we write data from reindexer.
                    executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
                    return () -> shutDown.set(true);
                }, metric, clock).reindex();
        reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
        assertEquals(reindexing, database.readReindexing("cluster"));
        assertTrue(shutDown.get(), "Session was shut down");

        // Document type is ignored in next run, as it has failed, and grace period is not yet over.
        clock.advance(Reindexer.failureGrace.minusMillis(1));
        new Reindexer(cluster, triggers(30), database, ReindexerTest::failIfCalled, metric, clock).reindex();
        assertEquals(reindexing, database.readReindexing("cluster"));

        // When failure grace period is over, reindexing resumes as usual.
        clock.advance(Duration.ofMillis(1));
        shutDown.set(false);
        new Reindexer(cluster, triggers(30), database, parameters -> {
            executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
            return () -> shutDown.set(true);
        }, metric, clock).reindex();
        reindexing = reindexing.with(music, reindexing.status().get(music).running().successful(clock.instant()));
        assertEquals(reindexing, database.readReindexing("cluster"));
        assertTrue(shutDown.get(), "Session was shut down");
    }

    List<Trigger> triggers(long... ready) {
        return Arrays.stream(ready).mapToObj(at -> new Trigger(music, Instant.ofEpochMilli(at), 0.2)).toList();
    }

}