aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
blob: 0a1fdb32a843bb02a2080ab2a20e206c54e1a1d6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;

import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Trigger;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.component.annotation.Inject;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.ZookeepersConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.jdisc.Metric;
import com.yahoo.net.HostName;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
import java.util.stream.Stream;

import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toMap;

/**
 * Runs in all cluster controller containers, and progresses reindexing efforts.
 * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting.
 * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown.
 *
 * @author jonmv
 */
public class ReindexingMaintainer extends AbstractComponent {

    private static final Logger log = Logger.getLogger(Reindexing.class.getName());

    private final Curator curator;
    private final List<Reindexer> reindexers;
    private final ScheduledExecutorService executor;

    @Inject
    public ReindexingMaintainer(@SuppressWarnings("unused") VespaZooKeeperServer ensureZkHasStarted,
                                Metric metric,
                                DocumentAccess access, ZookeepersConfig zookeepersConfig,
                                ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
                                ReindexingConfig reindexingConfig) {
        this(Clock.systemUTC(), metric, access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig);
    }

    ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
                         ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
                         ReindexingConfig reindexingConfig) {
        this.curator = Curator.create(zookeepersConfig.zookeeperserverlist());
        ReindexingCurator reindexingCurator = new ReindexingCurator(curator, access.getDocumentTypeManager());
        this.reindexers = reindexingConfig.clusters().entrySet().stream()
                                          .map(cluster -> new Reindexer(parseCluster(cluster.getKey(), clusterListConfig, allClustersBucketSpacesConfig, access.getDocumentTypeManager()),
                                                                        parseReady(cluster.getValue(), access.getDocumentTypeManager()),
                                                                        reindexingCurator,
                                                                        access,
                                                                        metric,
                                                                        clock))
                                          .toList();
        this.executor = new ScheduledThreadPoolExecutor(reindexingConfig.clusters().size(), new DaemonThreadFactory("reindexer-"));
        if (reindexingConfig.enabled())
            scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
                              Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
    }

    private void maintain() {
        for (Reindexer reindexer : reindexers)
            executor.submit(() -> {
                try {
                    reindexer.reindex();
                }
                catch (ReindexingLockException e) {
                    log.log(FINE, "Failed to acquire reindexing lock");
                }
                catch (Exception e) {
                    log.log(WARNING, "Exception when reindexing", e);
                }
            });
    }

    @Override
    public void deconstruct() {
        try {
            for (Reindexer reindexer : reindexers)
                reindexer.shutdown();

            executor.shutdown();

            executor.awaitTermination(5, TimeUnit.SECONDS); // Give it 5s to complete gracefully.

            curator.close(); // Close the underlying curator independently to force shutdown.

            if ( ! executor.isShutdown() && ! executor.awaitTermination(5, TimeUnit.SECONDS))
                log.log(WARNING, "Failed to shut down reindexing within timeout");
        }
        catch (InterruptedException e) {
            log.log(WARNING, "Interrupted while waiting for reindexing to shut down");
            Thread.currentThread().interrupt();
        }
        if ( ! executor.isShutdown()) {
            List<Runnable> remaining = executor.shutdownNow();
            log.log(WARNING, "Number of tasks remaining at hard shutdown: " + remaining.size());
        }

    }

    static List<Trigger> parseReady(ReindexingConfig.Clusters cluster, DocumentTypeManager manager) {
        return cluster.documentTypes().entrySet().stream()
                      .map(typeStatus -> new Trigger(manager.getDocumentType(typeStatus.getKey()),
                                                     Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()),
                                                     typeStatus.getValue().speed()))
                      .toList();
    }

    /** Schedules a task with the given interval (across all containers in this ZK cluster). */
    static void scheduleStaggered(BiConsumer<Long, Long> scheduler,
                                  Duration interval, Instant now,
                                  String hostname, String clusterHostnames) {
        long delayMillis = 0;
        long intervalMillis = interval.toMillis();
        List<String> hostnames = Stream.of(clusterHostnames.split(","))
                                       .map(hostPort -> hostPort.split(":")[0])
                                       .toList();
        if (hostnames.contains(hostname)) {
            long offset = hostnames.indexOf(hostname) * intervalMillis;
            intervalMillis *= hostnames.size();
            delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis);
        }
        scheduler.accept(delayMillis, intervalMillis);
    }

    static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces,
                                DocumentTypeManager manager) {
        return clusters.storage().stream()
                       .filter(storage -> storage.name().equals(name))
                       .map(storage -> new Cluster(name,
                                                   bucketSpaces.cluster(name)
                                                               .documentType().entrySet().stream()
                                                               .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
                                                                         entry -> entry.getValue().bucketSpace()))))
                       .findAny()
                       .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters"));
    }

}