1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
import com.google.inject.Inject;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.yolean.Exceptions;
import java.time.Instant;
import java.util.function.Function;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableMap;
/**
* Reads and writes status of initiated reindexing jobs.
*
* @author jonmv
*/
public class ReindexingCurator {
private static final String STATUS = "status";
private static final String TYPE = "type";
private static final String STARTED_MILLIS = "startedMillis";
private static final String ENDED_MILLIS = "endedMillis";
private static final String PROGRESS = "progress";
private static final String STATE = "state";
private static final String MESSAGE = "message";
private static final Path statusPath = Path.fromString("/reindexing/v1/status");
private final Curator curator;
private final ReindexingSerializer serializer;
public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
this.curator = curator;
this.serializer = new ReindexingSerializer(manager);
}
public Reindexing readReindexing() {
return curator.getData(statusPath).map(serializer::deserialize)
.orElse(Reindexing.empty());
}
public void writeReindexing(Reindexing reindexing) {
curator.set(statusPath, serializer.serialize(reindexing));
}
private static class ReindexingSerializer {
private final DocumentTypeManager types;
public ReindexingSerializer(DocumentTypeManager types) {
this.types = types;
}
private byte[] serialize(Reindexing reindexing) {
Cursor root = new Slime().setObject();
Cursor statusArray = root.setArray(STATUS);
reindexing.status().forEach((type, status) -> {
Cursor statusObject = statusArray.addObject();
statusObject.setString(TYPE, type.getName());
statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli());
status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli()));
status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString()));
statusObject.setString(STATE, toString(status.state()));
status.message().ifPresent(message -> statusObject.setString(MESSAGE, message));
});
return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
}
private Reindexing deserialize(byte[] json) {
return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS))
.filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents.
.collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())),
object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())),
require(STATE, object, field -> toState(field.asString())),
get(MESSAGE, object, field -> field.asString())))));
}
private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) {
return object.field(name).valid() ? mapper.apply(object.field(name)) : null;
}
private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) {
return requireNonNull(get(name, object, mapper));
}
private static String toString(Reindexing.State state) {
switch (state) {
case READY: return "ready";
case RUNNING: return "running";
case SUCCESSFUL: return "successful";
case FAILED: return "failed";
default: throw new IllegalArgumentException("Unexpected state '" + state + "'");
}
}
private static Reindexing.State toState(String value) {
switch (value) {
case "ready": return Reindexing.State.READY;
case "running": return Reindexing.State.RUNNING;
case "successful": return Reindexing.State.SUCCESSFUL;
case "failed": return Reindexing.State.FAILED;
default: throw new IllegalArgumentException("Unknown state '" + value + "'");
}
}
}
}
|