aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
blob: 0cf7f9eed9a1a2936dec1696f44c73055eb3239b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.reindexing;

import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.google.inject.Inject;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.ZookeepersConfig;
import com.yahoo.component.AbstractComponent;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.net.HostName;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Logger;
import java.util.stream.Stream;

import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableMap;

/**
 * Runs in all cluster controller containers, and progresses reindexing efforts.
 * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting.
 * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown.
 *
 * @author jonmv
 */
public class ReindexingMaintainer extends AbstractComponent {

    private static final Logger log = Logger.getLogger(Reindexing.class.getName());

    private final Reindexer reindexer;
    private final ScheduledExecutorService executor;

    // VespaZooKeeperServer dependency to ensure the ZK cluster is running.
    @Inject
    public ReindexingMaintainer(VespaZooKeeperServer zooKeeperServer, DocumentAccess access, ZookeepersConfig zookeepersConfig,
                                ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
                                ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
        this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig);
    }

    ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig,
                         ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
                         ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
        DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig);
        this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager),
                                       parseReady(reindexingConfig, manager),
                                       new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()),
                                                             reindexingConfig.clusterName(),
                                                             manager),
                                       access,
                                       clock);
        this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
        if (reindexingConfig.enabled())
            scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
                              Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
    }

    private void maintain() {
        try {
            reindexer.reindex();
        }
        catch (ReindexingLockException e) {
            log.log(FINE, "Failed to acquire reindexing lock");
        }
        catch (Exception e) {
            log.log(WARNING, "Exception when reindexing", e);
        }
    }

    @Override
    public void deconstruct() {
        try {
            reindexer.shutdown();
            executor.shutdown();
            if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
                log.log(WARNING, "Failed to shut down reindexer within timeout");
        }
        catch (InterruptedException e) {
            log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
            Thread.currentThread().interrupt();
        }

    }

    static Map<DocumentType, Instant> parseReady(ReindexingConfig config, DocumentTypeManager manager) {
        return config.status().entrySet().stream()
                     .collect(toUnmodifiableMap(typeStatus -> manager.getDocumentType(typeStatus.getKey()),
                                                typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
    }

    /** Schedules a task with the given interval (across all containers in this ZK cluster). */
    static void scheduleStaggered(BiConsumer<Long, Long> scheduler,
                                  Duration interval, Instant now,
                                  String hostname, String clusterHostnames) {
        long delayMillis = 0;
        long intervalMillis = interval.toMillis();
        List<String> hostnames = Stream.of(clusterHostnames.split(","))
                                       .map(hostPort -> hostPort.split(":")[0])
                                       .collect(toList());
        if (hostnames.contains(hostname)) {
            long offset = hostnames.indexOf(hostname) * intervalMillis;
            intervalMillis *= hostnames.size();
            delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis);
        }
        scheduler.accept(delayMillis, intervalMillis);
    }

    static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces,
                                DocumentTypeManager manager) {
        return clusters.storage().stream()
                       .filter(storage -> storage.name().equals(name))
                       .map(storage -> new Cluster(name,
                                                   storage.configid(),
                                                   bucketSpaces.cluster(name)
                                                               .documentType().entrySet().stream()
                                                               .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
                                                                         entry -> entry.getValue().bucketSpace()))))
                       .findAny()
                       .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters"));
    }

}