summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingCurator.java
blob: 5eef93dad23346a957d1ac49e1c667c20d7b8bc4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;

import ai.vespa.reindexing.Reindexing.Status;
import com.google.inject.Inject;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.path.Path;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.yolean.Exceptions;

import java.time.Instant;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toUnmodifiableMap;

/**
 * Reads and writes status of initiated reindexing jobs.
 *
 * @author jonmv
 */
public class ReindexingCurator {

    private static final String STATUS = "status";
    private static final String TYPE =  "type";
    private static final String STARTED_MILLIS = "startedMillis";
    private static final String ENDED_MILLIS = "endedMillis";
    private static final String PROGRESS = "progress";
    private static final String STATE = "state";
    private static final String MESSAGE = "message";

    private static final Path statusPath = Path.fromString("/reindexing/v1/status");

    private final Curator curator;
    private final ReindexingSerializer serializer;

    public ReindexingCurator(Curator curator, DocumentTypeManager manager) {
        this.curator = curator;
        this.serializer = new ReindexingSerializer(manager);
    }

    public Reindexing readReindexing() {
        return curator.getData(statusPath).map(serializer::deserialize)
                      .orElse(Reindexing.empty());
    }

    public void writeReindexing(Reindexing reindexing) {
        curator.set(statusPath, serializer.serialize(reindexing));
    }


    private static class ReindexingSerializer {

        private final DocumentTypeManager types;

        public ReindexingSerializer(DocumentTypeManager types) {
            this.types = types;
        }

        private byte[] serialize(Reindexing reindexing) {
            Cursor root = new Slime().setObject();
            Cursor statusArray = root.setArray(STATUS);
            reindexing.status().forEach((type, status) -> {
                Cursor statusObject = statusArray.addObject();
                statusObject.setString(TYPE, type.getName());
                statusObject.setLong(STARTED_MILLIS, status.startedAt().toEpochMilli());
                status.endedAt().ifPresent(endedAt -> statusObject.setLong(ENDED_MILLIS, endedAt.toEpochMilli()));
                status.progress().ifPresent(progress -> statusObject.setString(PROGRESS, progress.serializeToString()));
                statusObject.setString(STATE, toString(status.state()));
                status.message().ifPresent(message -> statusObject.setString(MESSAGE, message));
            });
            return Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root));
        }

        private Reindexing deserialize(byte[] json) {
            return new Reindexing(SlimeUtils.entriesStream(SlimeUtils.jsonToSlimeOrThrow(json).get().field(STATUS))
                                            .filter(object -> require(TYPE, object, field -> types.hasDataType(field.asString()))) // Forget unknown documents.
                                            .collect(toUnmodifiableMap(object -> require(TYPE, object, field -> types.getDocumentType(field.asString())),
                                                                       object -> new Status(require(STARTED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
                                                                                            get(ENDED_MILLIS, object, field -> Instant.ofEpochMilli(field.asLong())),
                                                                                            get(PROGRESS, object, field -> ProgressToken.fromSerializedString(field.asString())),
                                                                                            require(STATE, object, field -> toState(field.asString())),
                                                                                            get(MESSAGE, object, field -> field.asString())))));
        }

        private static <T> T get(String name, Inspector object, Function<Inspector, T> mapper) {
            return object.field(name).valid() ? mapper.apply(object.field(name)) : null;
        }

        private static <T> T require(String name, Inspector object, Function<Inspector, T> mapper) {
            return requireNonNull(get(name, object, mapper));
        }

        private static String toString(Reindexing.State state) {
            switch (state) {
                case READY: return "ready";
                case RUNNING: return "running";
                case SUCCESSFUL: return "successful";
                case FAILED: return "failed";
                default: throw new IllegalArgumentException("Unexpected state '" + state + "'");
            }
        }

        private static Reindexing.State toState(String value) {
            switch (value) {
                case "ready": return Reindexing.State.READY;
                case "running": return Reindexing.State.RUNNING;
                case "successful": return Reindexing.State.SUCCESSFUL;
                case "failed": return Reindexing.State.FAILED;
                default: throw new IllegalArgumentException("Unknown state '" + value + "'");
            }
        }

    }

}