diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-30 17:11:27 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-30 17:11:27 +0100 |
commit | 6dbc30c975ef0f7b43c41bc465fe6c1fc3470509 (patch) | |
tree | 83568d6276a338d8ea83215c6a956564e7023da8 /clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java | |
parent | 8e55369d6748715f0c67c8f9f986921fcded2777 (diff) |
Add data and storage of data for reindexing progress
Diffstat (limited to 'clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java new file mode 100644 index 00000000000..5eef93dad23 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -0,0 +1,121 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexing.Status; +import com.google.inject.Inject; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.path.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.yolean.Exceptions; + +import java.time.Instant; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Reads and writes status of initiated reindexing jobs. + * + * @author jonmv + */ +public class ReindexingCurator { + + private static final String STATUS = "status"; + private static final String TYPE = "type"; + private static final String STARTED_MILLIS = "startedMillis"; + private static final String ENDED_MILLIS = "endedMillis"; + private static final String PROGRESS = "progress"; + private static final String STATE = "state"; + private static final String MESSAGE = "message"; + + private static final Path statusPath = Path.fromString("/reindexing/v1/status"); + + private final Curator curator; + private final ReindexingSerializer serializer; + + public ReindexingCurator(Curator curator, DocumentTypeManager manager) { + this.curator = curator; + this.serializer = new ReindexingSerializer(manager); + } + + public Reindexing readReindexing() { + return curator.getData(statusPath).map(serializer::deserialize) + .orElse(Reindexing.empty()); + } + + public void writeReindexing(Reindexing reindexing) { + curator.set(statusPath, serializer.serialize(reindexing)); + } + + + private static class ReindexingSerializer { + + private final DocumentTypeManager types; + + public ReindexingSerializer(DocumentTypeManager types) { + this.types = types; + } + + private byte[] serialize(Reindexing reindexing) { + Cursor root = new Slime().setObject(); + Cursor statusArray = root.setArray(STATUS); + reindexing.status().forEach((type, status) -> { + Cursor statusObject = statusArray.addObject(); + statusObject.setString(TYPE, type.getName()); + statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli()); + status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli())); + status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString())); + statusObject.setString(STATE, toString(status.state())); + status.message().ifPresent(message -> statusObject.setString(MESSAGE, message)); + }); + return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); + } + + private Reindexing deserialize(byte[] json) { + return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS)) + .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents. + .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())), + object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())), + require(STATE, object, field -> toState(field.asString())), + get(MESSAGE, object, field -> field.asString()))))); + } + + private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) { + return object.field(name).valid() ? mapper.apply(object.field(name)) : null; + } + + private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) { + return requireNonNull(get(name, object, mapper)); + } + + private static String toString(Reindexing.State state) { + switch (state) { + case READY: return "ready"; + case RUNNING: return "running"; + case SUCCESSFUL: return "successful"; + case FAILED: return "failed"; + default: throw new IllegalArgumentException("Unexpected state '" + state + "'"); + } + } + + private static Reindexing.State toState(String value) { + switch (value) { + case "ready": return Reindexing.State.READY; + case "running": return Reindexing.State.RUNNING; + case "successful": return Reindexing.State.SUCCESSFUL; + case "failed": return Reindexing.State.FAILED; + default: throw new IllegalArgumentException("Unknown state '" + value + "'"); + } + } + + } + +} |