summaryrefslogtreecommitdiffstats
path: root/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
blob: 2426ca6c5e83f4f177d55244d6004d7a6db213c3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.concurrent.maintenance;

import com.google.common.util.concurrent.UncheckedTimeoutException;
import com.yahoo.net.HostName;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * The base class for maintainers. A maintainer is some job which runs at a fixed rate to perform maintenance tasks.
 *
 * @author bratseth
 * @author mpolden
 * @author jonmv
 */
public abstract class Maintainer implements Runnable {

    protected final Logger log = Logger.getLogger(this.getClass().getName());

    private final String name;
    private final Mode mode;
    private final JobControl jobControl;
    private final JobMetrics jobMetrics;
    private final Duration interval;
    private final ScheduledExecutorService service;
    private final AtomicBoolean shutDown = new AtomicBoolean();

    public Maintainer(String name, Mode mode, Duration interval, Instant startedAt, JobControl jobControl,
                      JobMetrics jobMetrics, List<String> clusterHostnames) {
        this.name = name;
        this.mode = Objects.requireNonNull(mode);
        this.interval = requireInterval(interval);
        this.jobControl = Objects.requireNonNull(jobControl);
        this.jobMetrics = Objects.requireNonNull(jobMetrics);
        Objects.requireNonNull(startedAt);
        Objects.requireNonNull(clusterHostnames);
        Duration initialDelay = staggeredDelay(interval, startedAt, HostName.getLocalhost(), clusterHostnames);
        service = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, name() + "-worker"));
        service.scheduleAtFixedRate(this, initialDelay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
        jobControl.started(name(), this);
    }

    @Override
    public void run() {
        lockAndMaintain(false);
    }

    /** Starts shutdown of this, typically by shutting down executors. {@link #awaitShutdown()} waits for shutdown to complete. */
    public void shutdown() {
        if ( ! shutDown.getAndSet(true))
            service.shutdown();
    }

    /** Waits for shutdown to complete, calling {@link #shutdown} if this hasn't been done already. */
    public void awaitShutdown() {
        shutdown();
        var timeout = Duration.ofSeconds(30);
        try {
            if (!service.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                log.log(Level.WARNING, "Maintainer " + name() + " failed to shutdown " +
                                       "within " + timeout);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public final String toString() { return name(); }

    /** Called once each time this maintenance job should run. Returns whether the maintenance run was successful */
    protected abstract boolean maintain();

    /** Returns the interval at which this job is set to run */
    protected Duration interval() { return interval; }

    /** Run this while holding the job lock */
    public final void lockAndMaintain(boolean force) {
        if (!force && !jobControl.isActive(name())) return;
        log.log(Level.FINE, () -> "Running " + this.getClass().getSimpleName());
        jobMetrics.recordRunOf(name());
        try (var lock = jobControl.lockJob(name())) {
            if (maintain()) jobMetrics.recordSuccessOf(name());
        } catch (UncheckedTimeoutException ignored) {
            if (mode == Mode.shared) {
                // This is fine as we're colliding with a run on another node
                jobMetrics.recordSuccessOf(name());
            }
        } catch (Throwable e) {
            log.log(Level.WARNING, this + " failed. Will retry in " + interval.toMinutes() + " minutes", e);
        } finally {
            jobMetrics.forward(name());
        }
        log.log(Level.FINE, () -> "Finished " + this.getClass().getSimpleName());
    }

    /** Returns the simple name of this job */
    public final String name() {
        return name == null ? this.getClass().getSimpleName() : name;
    }

    /** Returns the initial delay of this calculated from cluster index of given hostname */
    static Duration staggeredDelay(Duration interval, Instant now, String hostname, List<String> clusterHostnames) {
        Objects.requireNonNull(clusterHostnames);
        if ( ! clusterHostnames.contains(hostname))
            return interval;

        long offset = clusterHostnames.indexOf(hostname) * interval.toMillis() / clusterHostnames.size();
        return Duration.ofMillis(Math.floorMod(offset - now.toEpochMilli(), interval.toMillis()));
    }

    private static Duration requireInterval(Duration interval) {
        Objects.requireNonNull(interval);
        if (interval.isNegative() || interval.isZero())
            throw new IllegalArgumentException("Interval must be positive, but was " + interval);
        return interval;
    }

    public enum Mode {

        /** Completing a scheduled run on any node is sufficient */
        shared,

        /** Completing a scheduled run is always required */
        exclusive,

    }

}