// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.reindexing; import com.yahoo.document.DocumentType; import com.yahoo.documentapi.ProgressToken; import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Stream; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toUnmodifiableMap; /** * Reindexing status per document type. * * @author jonmv */ public class Reindexing { private static Reindexing empty = new Reindexing(Map.of()); private final Map status; Reindexing(Map status) { this.status = Map.copyOf(status); } public static Reindexing empty() { return empty; } public Reindexing with(DocumentType documentType, Status updated) { return new Reindexing(Stream.concat(Stream.of(documentType), status.keySet().stream()) .distinct() .collect(toUnmodifiableMap(type -> type, type -> documentType.equals(type) ? updated : status.get(type)))); } /** Reindexing status per document type, for types where this is known. */ public Map status() { return status; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Reindexing that = (Reindexing) o; return status.equals(that.status); } @Override public int hashCode() { return Objects.hash(status); } @Override public String toString() { return "Reindexing status " + status; } /** * Reindexing status for a single document type, in an application. Immutable. * * Reindexing starts at a given instant, and is progressed by visitors. */ public static class Status { private final Instant startedAt; private final Instant endedAt; private final String progress; private final State state; private final String message; Status(Instant startedAt, Instant endedAt, String progress, State state, String message) { this.startedAt = startedAt; this.endedAt = endedAt; this.progress = progress; this.state = state; this.message = message; } /** Returns a new, empty status, with no progress or result, in state READY. */ public static Status ready(Instant now) { return new Status(requireNonNull(now), null, null, State.READY, null); } /** Returns a copy of this, in state RUNNING. */ public Status running() { if (state != State.READY && state != State.FAILED) throw new IllegalStateException("Current state must be READY or FAILED when changing to RUNNING"); return new Status(startedAt, null, progress, State.RUNNING, null); } /** Returns a copy of this with the given progress. */ public Status progressed(ProgressToken progress) { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when updating progress"); synchronized (progress) { return new Status(startedAt, null, progress.serializeToString(), state, null); } } /** Returns a copy of this in state HALTED. */ public Status halted() { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when changing to READY"); return new Status(startedAt, null, progress, State.READY, null); } /** Returns a copy of this with the given end instant, in state SUCCESSFUL. */ public Status successful(Instant now) { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when changing to SUCCESSFUL"); return new Status(startedAt, requireNonNull(now), null, State.SUCCESSFUL, null); } /** Returns a copy of this with the given end instant and failure message, in state FAILED. */ public Status failed(Instant now, String message) { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when changing to FAILED"); return new Status(startedAt, requireNonNull(now), progress, State.FAILED, requireNonNull(message)); } public Instant startedAt() { return startedAt; } public Optional endedAt() { return Optional.ofNullable(endedAt); } public Optional progress() { return Optional.ofNullable(progress).map(ProgressToken::fromSerializedString); } public State state() { return state; } public Optional message() { return Optional.ofNullable(message); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; return startedAt.equals(status.startedAt) && Objects.equals(endedAt, status.endedAt) && Objects.equals(progress().map(ProgressToken::serializeToString), status.progress().map(ProgressToken::serializeToString)) && state == status.state && Objects.equals(message, status.message); } @Override public int hashCode() { return Objects.hash(startedAt, endedAt, progress().map(ProgressToken::serializeToString), state, message); } @Override public String toString() { return state + (message != null ? " (" + message + ")" : "") + ", started at " + startedAt + (endedAt != null ? ", ended at " + endedAt : "") + (progress != null ? ", with progress " + progress : ""); } } public enum State { /** Visit ready to be started. */ READY, /** Visit currently running. */ RUNNING, /** Visit completed successfully. */ SUCCESSFUL, /** Visit failed fatally. */ FAILED } public static class Trigger { private final DocumentType type; private final Instant readyAt; private final double speed; public Trigger(DocumentType type, Instant readyAt, double speed) { this.type = requireNonNull(type); this.readyAt = requireNonNull(readyAt); this.speed = speed; } public DocumentType type() { return type; } public Instant readyAt() { return readyAt; } public double speed() { return speed; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Trigger trigger = (Trigger) o; return Double.compare(trigger.speed, speed) == 0 && type.equals(trigger.type) && readyAt.equals(trigger.readyAt); } @Override public int hashCode() { return Objects.hash(type, readyAt, speed); } @Override public String toString() { return "Trigger{" + "type=" + type + ", readyAt=" + readyAt + ", speed=" + speed + '}'; } } }