aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
blob: 6f13a07414cd14575ffd9666abd354fce694d10d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;

import com.yahoo.document.DocumentType;
import com.yahoo.documentapi.ProgressToken;

import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableMap;

/**
 * Reindexing status per document type.
 *
 * @author jonmv
 */
public class Reindexing {

    private static Reindexing empty = new Reindexing(Map.of());

    private final Map<DocumentType, Status> status;

    Reindexing(Map<DocumentType, Status> status) {
        this.status = Map.copyOf(status);
    }

    public static Reindexing empty() {
        return empty;
    }

    public Reindexing with(DocumentType documentType, Status updated) {
        return new Reindexing(Stream.concat(Stream.of(documentType),
                                            status.keySet().stream())
                                    .distinct()
                                    .collect(toUnmodifiableMap(type -> type,
                                                               type -> documentType.equals(type) ? updated : status.get(type))));
    }

    /** Reindexing status per document type, for types where this is known. */
    public Map<DocumentType, Status> status() {
        return status;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Reindexing that = (Reindexing) o;
        return status.equals(that.status);
    }

    @Override
    public int hashCode() {
        return Objects.hash(status);
    }

    @Override
    public String toString() {
        return "Reindexing status " + status;
    }

    /**
     * Reindexing status for a single document type, in an application. Immutable.
     *
     * Reindexing starts at a given instant, and is progressed by visitors.
     */
    public static class Status {

        private final Instant startedAt;
        private final Instant endedAt;
        private final String progress;
        private final State state;
        private final String message;

        Status(Instant startedAt, Instant endedAt, String progress, State state, String message) {
            this.startedAt = startedAt;
            this.endedAt = endedAt;
            this.progress = progress;
            this.state = state;
            this.message = message;
        }

        /** Returns a new, empty status, with no progress or result, in state READY. */
        public static Status ready(Instant now) {
            return new Status(requireNonNull(now), null, null, State.READY, null);
        }

        /** Returns a copy of this, in state RUNNING. */
        public Status running() {
            if (state != State.READY && state != State.FAILED)
                throw new IllegalStateException("Current state must be READY or FAILED when changing to RUNNING");
            return new Status(startedAt, null, progress, State.RUNNING, null);
        }

        /** Returns a copy of this with the given progress. */
        public Status progressed(ProgressToken progress) {
            if (state != State.RUNNING)
                throw new IllegalStateException("Current state must be RUNNING when updating progress");
            synchronized (progress) {
                return new Status(startedAt, null, progress.serializeToString(), state, null);
            }
        }

        /** Returns a copy of this in state HALTED. */
        public Status halted() {
            if (state != State.RUNNING)
                throw new IllegalStateException("Current state must be RUNNING when changing to READY");
            return new Status(startedAt, null, progress, State.READY, null);
        }

        /** Returns a copy of this with the given end instant, in state SUCCESSFUL. */
        public Status successful(Instant now) {
            if (state != State.RUNNING)
                throw new IllegalStateException("Current state must be RUNNING when changing to SUCCESSFUL");
            return new Status(startedAt, requireNonNull(now), null, State.SUCCESSFUL, null);
        }

        /** Returns a copy of this with the given end instant and failure message, in state FAILED. */
        public Status failed(Instant now, String message) {
            if (state != State.RUNNING)
                throw new IllegalStateException("Current state must be RUNNING when changing to FAILED");
            return new Status(startedAt, requireNonNull(now), progress, State.FAILED, requireNonNull(message));
        }

        public Instant startedAt() {
            return startedAt;
        }

        public Optional<Instant> endedAt() {
            return Optional.ofNullable(endedAt);
        }

        public Optional<ProgressToken> progress() {
            return Optional.ofNullable(progress).map(ProgressToken::fromSerializedString);
        }

        public State state() {
            return state;
        }

        public Optional<String> message() {
            return Optional.ofNullable(message);
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Status status = (Status) o;
            return startedAt.equals(status.startedAt) &&
                   Objects.equals(endedAt, status.endedAt) &&
                   Objects.equals(progress().map(ProgressToken::serializeToString),
                                  status.progress().map(ProgressToken::serializeToString)) &&
                   state == status.state &&
                   Objects.equals(message, status.message);
        }

        @Override
        public int hashCode() {
            return Objects.hash(startedAt, endedAt, progress().map(ProgressToken::serializeToString), state, message);
        }

        @Override
        public String toString() {
            return state + (message != null ? " (" + message + ")" : "") +
                   ", started at " + startedAt +
                   (endedAt != null ? ", ended at " + endedAt : "") +
                   (progress != null ? ", with progress " + progress : "");
        }

    }


    public enum State {

        /** Visit ready to be started. */
        READY,

        /** Visit currently running. */
        RUNNING,

        /** Visit completed successfully. */
        SUCCESSFUL,

        /** Visit failed fatally. */
        FAILED

    }


    public static class Trigger {

        private final DocumentType type;
        private final Instant readyAt;
        private final double speed;

        public Trigger(DocumentType type, Instant readyAt, double speed) {
            this.type = requireNonNull(type);
            this.readyAt = requireNonNull(readyAt);
            this.speed = speed;
        }

        public DocumentType type() {
            return type;
        }

        public Instant readyAt() {
            return readyAt;
        }

        public double speed() {
            return speed;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Trigger trigger = (Trigger) o;
            return Double.compare(trigger.speed, speed) == 0 && type.equals(trigger.type) && readyAt.equals(trigger.readyAt);
        }

        @Override
        public int hashCode() {
            return Objects.hash(type, readyAt, speed);
        }

        @Override
        public String toString() {
            return "Trigger{" +
                   "type=" + type +
                   ", readyAt=" + readyAt +
                   ", speed=" + speed +
                   '}';
        }

    }

}