summaryrefslogtreecommitdiffstats
path: root/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
blob: b038106843e55cced4d2b50f0e831bbbcf93c7d5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.concurrent.maintenance;

import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.net.HostName;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * The base class for maintainers. A maintainer is some job which runs at a fixed rate to perform maintenance tasks.
 *
 * @author bratseth
 * @author mpolden
 * @author jonmv
 */
public abstract class Maintainer implements Runnable {

    protected final Logger log = Logger.getLogger(this.getClass().getName());

    private final String name;
    private final JobControl jobControl;
    private final JobMetrics jobMetrics;
    private final Duration interval;
    private final ScheduledExecutorService service;
    private final AtomicBoolean shutDown = new AtomicBoolean();
    private final boolean ignoreCollision;

    public Maintainer(String name, Duration interval, Instant startedAt, JobControl jobControl,
                      JobMetrics jobMetrics, List<String> clusterHostnames, boolean ignoreCollision) {
        this.name = name;
        this.interval = requireInterval(interval);
        this.jobControl = Objects.requireNonNull(jobControl);
        this.jobMetrics = Objects.requireNonNull(jobMetrics);
        this.ignoreCollision = ignoreCollision;
        Objects.requireNonNull(startedAt);
        Objects.requireNonNull(clusterHostnames);
        Duration initialDelay = staggeredDelay(interval, startedAt, HostName.getLocalhost(), clusterHostnames)
                                .plus(Duration.ofSeconds(30)); // Let the system stabilize before maintenance
        service = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, name() + "-worker"));
        service.scheduleAtFixedRate(this, initialDelay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
        jobControl.started(name(), this);
    }

    @Override
    public void run() {
        lockAndMaintain(false);
    }

    /** Starts shutdown of this, typically by shutting down executors. {@link #awaitShutdown()} waits for shutdown to complete. */
    public void shutdown() {
        if ( ! shutDown.getAndSet(true))
            service.shutdown();
    }

    /** Waits for shutdown to complete, calling {@link #shutdown} if this hasn't been done already. */
    public void awaitShutdown() {
        shutdown();
        var timeout = Duration.ofSeconds(30);
        try {
            if (!service.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                log.log(Level.WARNING, "Maintainer " + name() + " failed to shutdown " +
                                       "within " + timeout);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /** Returns whether this is being shut down */
    public final boolean shuttingDown() {
        return shutDown.get();
    }

    @Override
    public final String toString() { return name(); }

    /**
     * Called once each time this maintenance job should run.
     *
     * @return the degree to which the run was successful - a number between 0 (no success), to 1 (complete success).
     *         Note that this indicates whether something is wrong, so e.g if the call did nothing because it should do
     *         nothing,  1.0 should be returned.
     */
    protected abstract double maintain();

    /** Convenience methods to convert attempts and failures into a success factor */
    protected final double asSuccessFactor(int attempts, int failures) {
        double factor = attempts == 0 ? 1.0 : 1 - (double)failures / attempts;
        return new BigDecimal(factor).setScale(2, RoundingMode.HALF_UP).doubleValue();
    }

    /** Returns the interval at which this job is set to run */
    protected Duration interval() { return interval; }

    /** Run this while holding the job lock */
    public final void lockAndMaintain(boolean force) {
        if (!force && !jobControl.isActive(name())) return;
        log.log(Level.FINE, () -> "Running " + this.getClass().getSimpleName());

        double successFactor = 0;
        try (var lock = jobControl.lockJob(name())) {
            successFactor = maintain();
        }
        catch (UncheckedTimeoutException e) {
            if (ignoreCollision)
                successFactor = 1;
            else
                log.log(Level.WARNING, this + " collided with another run. Will retry in " + interval);
        }
        catch (Throwable e) {
            log.log(Level.WARNING, this + " failed. Will retry in " + interval, e);
        }
        finally {
            jobMetrics.completed(name(), successFactor);
        }
        log.log(Level.FINE, () -> "Finished " + this.getClass().getSimpleName());
    }

    /** Returns the simple name of this job */
    public final String name() {
        return name == null ? this.getClass().getSimpleName() : name;
    }

    /** Returns the initial delay of this calculated from cluster index of given hostname */
    static Duration staggeredDelay(Duration interval, Instant now, String hostname, List<String> clusterHostnames) {
        Objects.requireNonNull(clusterHostnames);
        if ( ! clusterHostnames.contains(hostname))
            return interval;

        long offset = clusterHostnames.indexOf(hostname) * interval.toMillis() / clusterHostnames.size();
        return Duration.ofMillis(Math.floorMod(offset - now.toEpochMilli(), interval.toMillis()));
    }

    private static Duration requireInterval(Duration interval) {
        Objects.requireNonNull(interval);
        if (interval.isNegative() || interval.isZero())
            throw new IllegalArgumentException("Interval must be positive, but was " + interval);
        return interval;
    }

}