diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-30 17:11:27 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-30 17:11:27 +0100 |
commit | 6dbc30c975ef0f7b43c41bc465fe6c1fc3470509 (patch) | |
tree | 83568d6276a338d8ea83215c6a956564e7023da8 | |
parent | 8e55369d6748715f0c67c8f9f986921fcded2777 (diff) |
Add data and storage of data for reindexing progress
9 files changed, 454 insertions, 3 deletions
diff --git a/clustercontroller-reindexer/CMakeLists.txt b/clustercontroller-reindexer/CMakeLists.txt new file mode 100644 index 00000000000..4d3fde15fca --- /dev/null +++ b/clustercontroller-reindexer/CMakeLists.txt @@ -0,0 +1,2 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +install_fat_java_artifact(clustercontroller-reindexer) diff --git a/clustercontroller-reindexer/pom.xml b/clustercontroller-reindexer/pom.xml new file mode 100644 index 00000000000..3791ea7d3f4 --- /dev/null +++ b/clustercontroller-reindexer/pom.xml @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>parent</artifactId> + <groupId>com.yahoo.vespa</groupId> + <version>7-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>clustercontroller-reindexer</artifactId> + <packaging>container-plugin</packaging> + + <dependencies> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>container-dev</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>zkfacade</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>zookeeper-server-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-provisioning</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-model-api</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-model</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>bundle-plugin</artifactId> + <version>${project.version}</version> + <extensions>true</extensions> + <configuration> + <useCommonAssemblyIds>true</useCommonAssemblyIds> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project>
\ No newline at end of file diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java new file mode 100644 index 00000000000..c9ca0d94995 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -0,0 +1,193 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import com.yahoo.document.DocumentType; +import com.yahoo.documentapi.ProgressToken; + +import java.time.Instant; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.UnaryOperator; +import java.util.stream.Stream; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Reindexing status per document type. + * + * @author jonmv + */ +public class Reindexing { + + private static Reindexing empty = new Reindexing(Map.of()); + + private final Map<DocumentType, Status> status; + + Reindexing(Map<DocumentType, Status> status) { + this.status = Map.copyOf(status); + } + + public static Reindexing empty() { + return empty; + } + + public Reindexing with(DocumentType documentType, Status updated) { + return new Reindexing(Stream.concat(Stream.of(documentType), + status.keySet().stream()) + .distinct() + .collect(toUnmodifiableMap(type -> type, + type -> documentType.equals(type) ? updated : status.get(type)))); + } + + /** Reindexing status per document type, for types where this is known. */ + public Map<DocumentType, Status> status() { + return status; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Reindexing that = (Reindexing) o; + return status.equals(that.status); + } + + @Override + public int hashCode() { + return Objects.hash(status); + } + + @Override + public String toString() { + return "Reindexing status " + status; + } + + /** + * Reindexing status for a single document type, in an application. Immutable. + * + * Reindexing starts at a given instant, and is progressed by visitors. + */ + public static class Status { + + private final Instant startedAt; + private final Instant endedAt; + private final ProgressToken progress; + private final State state; + private final String message; + + Status(Instant startedAt, Instant endedAt, ProgressToken progress, State state, String message) { + this.startedAt = startedAt; + this.endedAt = endedAt; + this.progress = progress; + this.state = state; + this.message = message; + } + + /** Returns a new, empty status, with no progress or result, in state READY. */ + public static Status ready(Instant now) { + return new Status(requireNonNull(now), null, new ProgressToken(), State.READY, null); + } + + /** Returns a copy of this, in state RUNNING. */ + public Status running() { + if (state != State.READY) + throw new IllegalStateException("Current state must be READY when changing to RUNNING"); + return new Status(startedAt, null, progress, State.RUNNING, null); + } + + /** Returns a copy of this with the given progress. */ + public Status progressed(ProgressToken progress) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when updating progress"); + return new Status(startedAt, null, requireNonNull(progress), state, null); + } + + /** Returns a copy of this in state HALTED. */ + public Status halted() { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to READY"); + return new Status(startedAt, null, progress, State.READY, null); + } + + /** Returns a copy of this with the given end instant, in state SUCCESSFUL. */ + public Status successful(Instant now) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to SUCCESSFUL"); + return new Status(startedAt, requireNonNull(now), null, State.SUCCESSFUL, null); + } + + /** Returns a copy of this with the given end instant and failure message, in state FAILED. */ + public Status failed(Instant now, String message) { + if (state != State.RUNNING) + throw new IllegalStateException("Current state must be RUNNING when changing to FAILED"); + return new Status(startedAt, requireNonNull(now), null, State.FAILED, requireNonNull(message)); + } + + public Instant startedAt() { + return startedAt; + } + + public Optional<Instant> endedAt() { + return Optional.ofNullable(endedAt); + } + + public Optional<ProgressToken> progress() { + return Optional.ofNullable(progress); + } + + public State state() { + return state; + } + + public Optional<String> message() { + return Optional.ofNullable(message); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return startedAt.equals(status.startedAt) && + Objects.equals(endedAt, status.endedAt) && + Objects.equals(progress().map(ProgressToken::serializeToString), + status.progress().map(ProgressToken::serializeToString)) && + state == status.state && + Objects.equals(message, status.message); + } + + @Override + public int hashCode() { + return Objects.hash(startedAt, endedAt, progress().map(ProgressToken::serializeToString), state, message); + } + + @Override + public String toString() { + return state + (message != null ? " (" + message + ")" : "") + + ", started at " + startedAt + + (endedAt != null ? ", ended at " + endedAt : "") + + (progress != null ? ", with progress " + progress : ""); + } + + } + + + public enum State { + + /** Visit ready to be started. */ + READY, + + /** Visit currently running. */ + RUNNING, + + /** Visit completed successfully. */ + SUCCESSFUL, + + /** Visit failed fatally. */ + FAILED + + } + +} diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java new file mode 100644 index 00000000000..5eef93dad23 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java @@ -0,0 +1,121 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexing.Status; +import com.google.inject.Inject; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.path.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.curator.Curator; +import com.yahoo.yolean.Exceptions; + +import java.time.Instant; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Reads and writes status of initiated reindexing jobs. + * + * @author jonmv + */ +public class ReindexingCurator { + + private static final String STATUS = "status"; + private static final String TYPE = "type"; + private static final String STARTED_MILLIS = "startedMillis"; + private static final String ENDED_MILLIS = "endedMillis"; + private static final String PROGRESS = "progress"; + private static final String STATE = "state"; + private static final String MESSAGE = "message"; + + private static final Path statusPath = Path.fromString("/reindexing/v1/status"); + + private final Curator curator; + private final ReindexingSerializer serializer; + + public ReindexingCurator(Curator curator, DocumentTypeManager manager) { + this.curator = curator; + this.serializer = new ReindexingSerializer(manager); + } + + public Reindexing readReindexing() { + return curator.getData(statusPath).map(serializer::deserialize) + .orElse(Reindexing.empty()); + } + + public void writeReindexing(Reindexing reindexing) { + curator.set(statusPath, serializer.serialize(reindexing)); + } + + + private static class ReindexingSerializer { + + private final DocumentTypeManager types; + + public ReindexingSerializer(DocumentTypeManager types) { + this.types = types; + } + + private byte[] serialize(Reindexing reindexing) { + Cursor root = new Slime().setObject(); + Cursor statusArray = root.setArray(STATUS); + reindexing.status().forEach((type, status) -> { + Cursor statusObject = statusArray.addObject(); + statusObject.setString(TYPE, type.getName()); + statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli()); + status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli())); + status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString())); + statusObject.setString(STATE, toString(status.state())); + status.message().ifPresent(message -> statusObject.setString(MESSAGE, message)); + }); + return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root)); + } + + private Reindexing deserialize(byte[] json) { + return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS)) + .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents. + .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())), + object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())), + get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())), + require(STATE, object, field -> toState(field.asString())), + get(MESSAGE, object, field -> field.asString()))))); + } + + private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) { + return object.field(name).valid() ? mapper.apply(object.field(name)) : null; + } + + private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) { + return requireNonNull(get(name, object, mapper)); + } + + private static String toString(Reindexing.State state) { + switch (state) { + case READY: return "ready"; + case RUNNING: return "running"; + case SUCCESSFUL: return "successful"; + case FAILED: return "failed"; + default: throw new IllegalArgumentException("Unexpected state '" + state + "'"); + } + } + + private static Reindexing.State toState(String value) { + switch (value) { + case "ready": return Reindexing.State.READY; + case "running": return Reindexing.State.RUNNING; + case "successful": return Reindexing.State.SUCCESSFUL; + case "failed": return Reindexing.State.FAILED; + default: throw new IllegalArgumentException("Unknown state '" + value + "'"); + } + } + + } + +} diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java new file mode 100644 index 00000000000..0501ff474ca --- /dev/null +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java @@ -0,0 +1,48 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.vespa.curator.mock.MockCurator; +import org.junit.jupiter.api.Test; + +import java.time.Instant; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class ReindexingCuratorTest { + + @Test + void testSerialization() { + DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); + DocumentmanagerConfig emptyConfig = new DocumentmanagerConfig.Builder().build(); + DocumentTypeManager manager = new DocumentTypeManager(musicConfig); + DocumentType music = manager.getDocumentType("music"); + MockCurator mockCurator = new MockCurator(); + ReindexingCurator curator = new ReindexingCurator(mockCurator, manager); + + assertEquals(Reindexing.empty(), curator.readReindexing()); + + Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123)) + .running() + .progressed(new ProgressToken()); + Reindexing reindexing = Reindexing.empty().with(music, status); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); + + status = status.halted().running().failed(Instant.ofEpochMilli(321), "error"); + reindexing = reindexing.with(music, status); + curator.writeReindexing(reindexing); + assertEquals(reindexing, curator.readReindexing()); + + // Unknown document types are forgotten. + assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing()); + } + +} diff --git a/clustercontroller-reindexer/src/test/resources/schemas/music.sd b/clustercontroller-reindexer/src/test/resources/schemas/music.sd new file mode 100644 index 00000000000..a289f5a686b --- /dev/null +++ b/clustercontroller-reindexer/src/test/resources/schemas/music.sd @@ -0,0 +1,6 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +search music { + document music { + field artist type string { } + } +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java index 489128c211e..05e66ef95f5 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java @@ -20,9 +20,9 @@ public class VisitorControlHandler { SUCCESS, /** Aborted by user. */ ABORTED, - /** Failure */ + /** Fatal failure. */ FAILURE, - /** Create visitor reply did not return within the specified timeframe. */ + /** Create visitor reply did not return within the specified timeframe, or the session timed out. */ TIMEOUT }; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 6203b724e95..e1d18080faf 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -735,7 +735,7 @@ public class MessageBusVisitorSession implements VisitorSession { String msg = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while processing reply in visitor session"; - e.printStackTrace(); + log.log(Level.WARNING, msg, e); transitionTo(new StateDescription(State.FAILED, msg)); } catch (Throwable t) { // We can't reliably handle this; take a nosedive @@ -37,6 +37,7 @@ <module>clustercontroller-apps</module> <module>clustercontroller-apputil</module> <module>clustercontroller-core</module> + <module>clustercontroller-reindexer</module> <module>clustercontroller-standalone</module> <module>clustercontroller-utils</module> <module>component</module> |