aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-30 17:11:27 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-10-30 17:11:27 +0100
commit6dbc30c975ef0f7b43c41bc465fe6c1fc3470509 (patch)
tree83568d6276a338d8ea83215c6a956564e7023da8
parent8e55369d6748715f0c67c8f9f986921fcded2777 (diff)
Add data and storage of data for reindexing progress
-rw-r--r--clustercontroller-reindexer/CMakeLists.txt2
-rw-r--r--clustercontroller-reindexer/pom.xml80
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java193
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java121
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java48
-rw-r--r--clustercontroller-reindexer/src/test/resources/schemas/music.sd6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java4
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java2
-rw-r--r--pom.xml1
9 files changed, 454 insertions, 3 deletions
diff --git a/clustercontroller-reindexer/CMakeLists.txt b/clustercontroller-reindexer/CMakeLists.txt
new file mode 100644
index 00000000000..4d3fde15fca
--- /dev/null
+++ b/clustercontroller-reindexer/CMakeLists.txt
@@ -0,0 +1,2 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+install_fat_java_artifact(clustercontroller-reindexer)
diff --git a/clustercontroller-reindexer/pom.xml b/clustercontroller-reindexer/pom.xml
new file mode 100644
index 00000000000..3791ea7d3f4
--- /dev/null
+++ b/clustercontroller-reindexer/pom.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>parent</artifactId>
+ <groupId>com.yahoo.vespa</groupId>
+ <version>7-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>clustercontroller-reindexer</artifactId>
+ <packaging>container-plugin</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-dev</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>zkfacade</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>zookeeper-server-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-provisioning</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-model-api</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-model</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>bundle-plugin</artifactId>
+ <version>${project.version}</version>
+ <extensions>true</extensions>
+ <configuration>
+ <useCommonAssemblyIds>true</useCommonAssemblyIds>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project> \ No newline at end of file
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
new file mode 100644
index 00000000000..c9ca0d94995
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
@@ -0,0 +1,193 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import com.yahoo.document.DocumentType;
+import com.yahoo.documentapi.ProgressToken;
+
+import java.time.Instant;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Reindexing status per document type.
+ *
+ * @author jonmv
+ */
+public class Reindexing {
+
+ private static Reindexing empty = new Reindexing(Map.of());
+
+ private final Map<DocumentType, Status> status;
+
+ Reindexing(Map<DocumentType, Status> status) {
+ this.status = Map.copyOf(status);
+ }
+
+ public static Reindexing empty() {
+ return empty;
+ }
+
+ public Reindexing with(DocumentType documentType, Status updated) {
+ return new Reindexing(Stream.concat(Stream.of(documentType),
+ status.keySet().stream())
+ .distinct()
+ .collect(toUnmodifiableMap(type -> type,
+ type -> documentType.equals(type) ? updated : status.get(type))));
+ }
+
+ /** Reindexing status per document type, for types where this is known. */
+ public Map<DocumentType, Status> status() {
+ return status;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Reindexing that = (Reindexing) o;
+ return status.equals(that.status);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status);
+ }
+
+ @Override
+ public String toString() {
+ return "Reindexing status " + status;
+ }
+
+ /**
+ * Reindexing status for a single document type, in an application. Immutable.
+ *
+ * Reindexing starts at a given instant, and is progressed by visitors.
+ */
+ public static class Status {
+
+ private final Instant startedAt;
+ private final Instant endedAt;
+ private final ProgressToken progress;
+ private final State state;
+ private final String message;
+
+ Status(Instant startedAt, Instant endedAt, ProgressToken progress, State state, String message) {
+ this.startedAt = startedAt;
+ this.endedAt = endedAt;
+ this.progress = progress;
+ this.state = state;
+ this.message = message;
+ }
+
+ /** Returns a new, empty status, with no progress or result, in state READY. */
+ public static Status ready(Instant now) {
+ return new Status(requireNonNull(now), null, new ProgressToken(), State.READY, null);
+ }
+
+ /** Returns a copy of this, in state RUNNING. */
+ public Status running() {
+ if (state != State.READY)
+ throw new IllegalStateException("Current state must be READY when changing to RUNNING");
+ return new Status(startedAt, null, progress, State.RUNNING, null);
+ }
+
+ /** Returns a copy of this with the given progress. */
+ public Status progressed(ProgressToken progress) {
+ if (state != State.RUNNING)
+ throw new IllegalStateException("Current state must be RUNNING when updating progress");
+ return new Status(startedAt, null, requireNonNull(progress), state, null);
+ }
+
+ /** Returns a copy of this in state HALTED. */
+ public Status halted() {
+ if (state != State.RUNNING)
+ throw new IllegalStateException("Current state must be RUNNING when changing to READY");
+ return new Status(startedAt, null, progress, State.READY, null);
+ }
+
+ /** Returns a copy of this with the given end instant, in state SUCCESSFUL. */
+ public Status successful(Instant now) {
+ if (state != State.RUNNING)
+ throw new IllegalStateException("Current state must be RUNNING when changing to SUCCESSFUL");
+ return new Status(startedAt, requireNonNull(now), null, State.SUCCESSFUL, null);
+ }
+
+ /** Returns a copy of this with the given end instant and failure message, in state FAILED. */
+ public Status failed(Instant now, String message) {
+ if (state != State.RUNNING)
+ throw new IllegalStateException("Current state must be RUNNING when changing to FAILED");
+ return new Status(startedAt, requireNonNull(now), null, State.FAILED, requireNonNull(message));
+ }
+
+ public Instant startedAt() {
+ return startedAt;
+ }
+
+ public Optional<Instant> endedAt() {
+ return Optional.ofNullable(endedAt);
+ }
+
+ public Optional<ProgressToken> progress() {
+ return Optional.ofNullable(progress);
+ }
+
+ public State state() {
+ return state;
+ }
+
+ public Optional<String> message() {
+ return Optional.ofNullable(message);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Status status = (Status) o;
+ return startedAt.equals(status.startedAt) &&
+ Objects.equals(endedAt, status.endedAt) &&
+ Objects.equals(progress().map(ProgressToken::serializeToString),
+ status.progress().map(ProgressToken::serializeToString)) &&
+ state == status.state &&
+ Objects.equals(message, status.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(startedAt, endedAt, progress().map(ProgressToken::serializeToString), state, message);
+ }
+
+ @Override
+ public String toString() {
+ return state + (message != null ? " (" + message + ")" : "") +
+ ", started at " + startedAt +
+ (endedAt != null ? ", ended at " + endedAt : "") +
+ (progress != null ? ", with progress " + progress : "");
+ }
+
+ }
+
+
+ public enum State {
+
+ /** Visit ready to be started. */
+ READY,
+
+ /** Visit currently running. */
+ RUNNING,
+
+ /** Visit completed successfully. */
+ SUCCESSFUL,
+
+ /** Visit failed fatally. */
+ FAILED
+
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
new file mode 100644
index 00000000000..5eef93dad23
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
@@ -0,0 +1,121 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexing.Status;
+import com.google.inject.Inject;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.path.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.curator.Curator;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Reads and writes status of initiated reindexing jobs.
+ *
+ * @author jonmv
+ */
+public class ReindexingCurator {
+
+ private static final String STATUS = "status";
+ private static final String TYPE = "type";
+ private static final String STARTED_MILLIS = "startedMillis";
+ private static final String ENDED_MILLIS = "endedMillis";
+ private static final String PROGRESS = "progress";
+ private static final String STATE = "state";
+ private static final String MESSAGE = "message";
+
+ private static final Path statusPath = Path.fromString("/reindexing/v1/status");
+
+ private final Curator curator;
+ private final ReindexingSerializer serializer;
+
+ public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
+ this.curator = curator;
+ this.serializer = new ReindexingSerializer(manager);
+ }
+
+ public Reindexing readReindexing() {
+ return curator.getData(statusPath).map(serializer::deserialize)
+ .orElse(Reindexing.empty());
+ }
+
+ public void writeReindexing(Reindexing reindexing) {
+ curator.set(statusPath, serializer.serialize(reindexing));
+ }
+
+
+ private static class ReindexingSerializer {
+
+ private final DocumentTypeManager types;
+
+ public ReindexingSerializer(DocumentTypeManager types) {
+ this.types = types;
+ }
+
+ private byte[] serialize(Reindexing reindexing) {
+ Cursor root = new Slime().setObject();
+ Cursor statusArray = root.setArray(STATUS);
+ reindexing.status().forEach((type, status) -> {
+ Cursor statusObject = statusArray.addObject();
+ statusObject.setString(TYPE, type.getName());
+ statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli());
+ status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli()));
+ status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString()));
+ statusObject.setString(STATE, toString(status.state()));
+ status.message().ifPresent(message -> statusObject.setString(MESSAGE, message));
+ });
+ return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
+ }
+
+ private Reindexing deserialize(byte[] json) {
+ return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS))
+ .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents.
+ .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())),
+ object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
+ get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
+ get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())),
+ require(STATE, object, field -> toState(field.asString())),
+ get(MESSAGE, object, field -> field.asString())))));
+ }
+
+ private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) {
+ return object.field(name).valid() ? mapper.apply(object.field(name)) : null;
+ }
+
+ private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) {
+ return requireNonNull(get(name, object, mapper));
+ }
+
+ private static String toString(Reindexing.State state) {
+ switch (state) {
+ case READY: return "ready";
+ case RUNNING: return "running";
+ case SUCCESSFUL: return "successful";
+ case FAILED: return "failed";
+ default: throw new IllegalArgumentException("Unexpected state '" + state + "'");
+ }
+ }
+
+ private static Reindexing.State toState(String value) {
+ switch (value) {
+ case "ready": return Reindexing.State.READY;
+ case "running": return Reindexing.State.RUNNING;
+ case "successful": return Reindexing.State.SUCCESSFUL;
+ case "failed": return Reindexing.State.FAILED;
+ default: throw new IllegalArgumentException("Unknown state '" + value + "'");
+ }
+ }
+
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
new file mode 100644
index 00000000000..0501ff474ca
--- /dev/null
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingCuratorTest.java
@@ -0,0 +1,48 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.searchdefinition.derived.Deriver;
+import com.yahoo.vespa.curator.mock.MockCurator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class ReindexingCuratorTest {
+
+ @Test
+ void testSerialization() {
+ DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
+ DocumentmanagerConfig emptyConfig = new DocumentmanagerConfig.Builder().build();
+ DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
+ DocumentType music = manager.getDocumentType("music");
+ MockCurator mockCurator = new MockCurator();
+ ReindexingCurator curator = new ReindexingCurator(mockCurator, manager);
+
+ assertEquals(Reindexing.empty(), curator.readReindexing());
+
+ Reindexing.Status status = Reindexing.Status.ready(Instant.ofEpochMilli(123))
+ .running()
+ .progressed(new ProgressToken());
+ Reindexing reindexing = Reindexing.empty().with(music, status);
+ curator.writeReindexing(reindexing);
+ assertEquals(reindexing, curator.readReindexing());
+
+ status = status.halted().running().failed(Instant.ofEpochMilli(321), "error");
+ reindexing = reindexing.with(music, status);
+ curator.writeReindexing(reindexing);
+ assertEquals(reindexing, curator.readReindexing());
+
+ // Unknown document types are forgotten.
+ assertEquals(Reindexing.empty(), new ReindexingCurator(mockCurator, new DocumentTypeManager(emptyConfig)).readReindexing());
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/resources/schemas/music.sd b/clustercontroller-reindexer/src/test/resources/schemas/music.sd
new file mode 100644
index 00000000000..a289f5a686b
--- /dev/null
+++ b/clustercontroller-reindexer/src/test/resources/schemas/music.sd
@@ -0,0 +1,6 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+search music {
+ document music {
+ field artist type string { }
+ }
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
index 489128c211e..05e66ef95f5 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlHandler.java
@@ -20,9 +20,9 @@ public class VisitorControlHandler {
SUCCESS,
/** Aborted by user. */
ABORTED,
- /** Failure */
+ /** Fatal failure. */
FAILURE,
- /** Create visitor reply did not return within the specified timeframe. */
+ /** Create visitor reply did not return within the specified timeframe, or the session timed out. */
TIMEOUT
};
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 6203b724e95..e1d18080faf 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -735,7 +735,7 @@ public class MessageBusVisitorSession implements VisitorSession {
String msg = "Got exception of type " + e.getClass().getName() +
" with message '" + e.getMessage() +
"' while processing reply in visitor session";
- e.printStackTrace();
+ log.log(Level.WARNING, msg, e);
transitionTo(new StateDescription(State.FAILED, msg));
} catch (Throwable t) {
// We can't reliably handle this; take a nosedive
diff --git a/pom.xml b/pom.xml
index 689b8a7d3b2..2f789995ea3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,6 +37,7 @@
<module>clustercontroller-apps</module>
<module>clustercontroller-apputil</module>
<module>clustercontroller-core</module>
+ <module>clustercontroller-reindexer</module>
<module>clustercontroller-standalone</module>
<module>clustercontroller-utils</module>
<module>component</module>