aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-30 17:11:27 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-10-30 17:11:27 +0100
commit6dbc30c975ef0f7b43c41bc465fe6c1fc3470509 (patch)
tree83568d6276a338d8ea83215c6a956564e7023da8 /clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
parent8e55369d6748715f0c67c8f9f986921fcded2777 (diff)
Add data and storage of data for reindexing progress
Diffstat (limited to 'clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java121
1 files changed, 121 insertions, 0 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
new file mode 100644
index 00000000000..5eef93dad23
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -0,0 +1,121 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexing.Status;
+import com.google.inject.Inject;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.path.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Reads and writes status of initiated reindexing jobs.
+ *
+ * @author jonmv
+ */
+public class ReindexingCurator {
+
+ private static final String STATUS = "status";
+ private static final String TYPE = "type";
+ private static final String STARTED_MILLIS = "startedMillis";
+ private static final String ENDED_MILLIS = "endedMillis";
+ private static final String PROGRESS = "progress";
+ private static final String STATE = "state";
+ private static final String MESSAGE = "message";
+
+ private static final Path statusPath = Path.fromString("/reindexing/v1/status");
+
+ private final Curator curator;
+ private final ReindexingSerializer serializer;
+
+ public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
+ this.curator = curator;
+ this.serializer = new ReindexingSerializer(manager);
+ }
+
+ public Reindexing readReindexing() {
+ return curator.getData(statusPath).map(serializer::deserialize)
+ .orElse(Reindexing.empty());
+ }
+
+ public void writeReindexing(Reindexing reindexing) {
+ curator.set(statusPath, serializer.serialize(reindexing));
+ }
+
+
+ private static class ReindexingSerializer {
+
+ private final DocumentTypeManager types;
+
+ public ReindexingSerializer(DocumentTypeManager types) {
+ this.types = types;
+ }
+
+ private byte[] serialize(Reindexing reindexing) {
+ Cursor root = new Slime().setObject();
+ Cursor statusArray = root.setArray(STATUS);
+ reindexing.status().forEach((type, status) -> {
+ Cursor statusObject = statusArray.addObject();
+ statusObject.setString(TYPE, type.getName());
+ statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli());
+ status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli()));
+ status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString()));
+ statusObject.setString(STATE, toString(status.state()));
+ status.message().ifPresent(message -> statusObject.setString(MESSAGE, message));
+ });
+ return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
+ }
+
+ private Reindexing deserialize(byte[] json) {
+ return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS))
+ .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents.
+ .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())),
+ object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
+ get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
+ get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())),
+ require(STATE, object, field -> toState(field.asString())),
+ get(MESSAGE, object, field -> field.asString())))));
+ }
+
+ private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) {
+ return object.field(name).valid() ? mapper.apply(object.field(name)) : null;
+ }
+
+ private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) {
+ return requireNonNull(get(name, object, mapper));
+ }
+
+ private static String toString(Reindexing.State state) {
+ switch (state) {
+ case READY: return "ready";
+ case RUNNING: return "running";
+ case SUCCESSFUL: return "successful";
+ case FAILED: return "failed";
+ default: throw new IllegalArgumentException("Unexpected state '" + state + "'");
+ }
+ }
+
+ private static Reindexing.State toState(String value) {
+ switch (value) {
+ case "ready": return Reindexing.State.READY;
+ case "running": return Reindexing.State.RUNNING;
+ case "successful": return Reindexing.State.SUCCESSFUL;
+ case "failed": return Reindexing.State.FAILED;
+ default: throw new IllegalArgumentException("Unknown state '" + value + "'");
+ }
+ }
+
+ }
+
+}