// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.concurrent.maintenance; import com.google.common.util.concurrent.UncheckedTimeoutException; import com.yahoo.net.HostName; import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; /** * The base class for maintainers. A maintainer is some job which runs at a fixed rate to perform maintenance tasks. * * @author bratseth * @author mpolden * @author jonmv */ public abstract class Maintainer implements Runnable { protected final Logger log = Logger.getLogger(this.getClass().getName()); private final String name; private final Mode mode; private final JobControl jobControl; private final JobMetrics jobMetrics; private final Duration interval; private final ScheduledExecutorService service; private final AtomicBoolean shutDown = new AtomicBoolean(); public Maintainer(String name, Mode mode, Duration interval, Instant startedAt, JobControl jobControl, JobMetrics jobMetrics, List clusterHostnames) { this.name = name; this.mode = Objects.requireNonNull(mode); this.interval = requireInterval(interval); this.jobControl = Objects.requireNonNull(jobControl); this.jobMetrics = Objects.requireNonNull(jobMetrics); Objects.requireNonNull(startedAt); Objects.requireNonNull(clusterHostnames); Duration initialDelay = staggeredDelay(interval, startedAt, HostName.getLocalhost(), clusterHostnames); service = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, name() + "-worker")); service.scheduleAtFixedRate(this, initialDelay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); jobControl.started(name(), this); } @Override public void run() { lockAndMaintain(false); } /** Starts shutdown of this, typically by shutting down executors. {@link #awaitShutdown()} waits for shutdown to complete. */ public void shutdown() { if ( ! shutDown.getAndSet(true)) service.shutdown(); } /** Waits for shutdown to complete, calling {@link #shutdown} if this hasn't been done already. */ public void awaitShutdown() { shutdown(); var timeout = Duration.ofSeconds(30); try { if (!service.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { log.log(Level.WARNING, "Maintainer " + name() + " failed to shutdown " + "within " + timeout); } } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public final String toString() { return name(); } /** Called once each time this maintenance job should run. Returns whether the maintenance run was successful */ protected abstract boolean maintain(); /** Returns the interval at which this job is set to run */ protected Duration interval() { return interval; } /** Run this while holding the job lock */ public final void lockAndMaintain(boolean force) { if (!force && !jobControl.isActive(name())) return; log.log(Level.FINE, () -> "Running " + this.getClass().getSimpleName()); jobMetrics.recordRunOf(name()); try (var lock = jobControl.lockJob(name())) { if (maintain()) jobMetrics.recordSuccessOf(name()); } catch (UncheckedTimeoutException ignored) { if (mode == Mode.shared) { // This is fine as we're colliding with a run on another node jobMetrics.recordSuccessOf(name()); } } catch (Throwable e) { log.log(Level.WARNING, this + " failed. Will retry in " + interval.toMinutes() + " minutes", e); } finally { jobMetrics.forward(name()); } log.log(Level.FINE, () -> "Finished " + this.getClass().getSimpleName()); } /** Returns the simple name of this job */ public final String name() { return name == null ? this.getClass().getSimpleName() : name; } /** Returns the initial delay of this calculated from cluster index of given hostname */ static Duration staggeredDelay(Duration interval, Instant now, String hostname, List clusterHostnames) { Objects.requireNonNull(clusterHostnames); if ( ! clusterHostnames.contains(hostname)) return interval; long offset = clusterHostnames.indexOf(hostname) * interval.toMillis() / clusterHostnames.size(); return Duration.ofMillis(Math.floorMod(offset - now.toEpochMilli(), interval.toMillis())); } private static Duration requireInterval(Duration interval) { Objects.requireNonNull(interval); if (interval.isNegative() || interval.isZero()) throw new IllegalArgumentException("Interval must be positive, but was " + interval); return interval; } public enum Mode { /** Completing a scheduled run on any node is sufficient */ shared, /** Completing a scheduled run is always required */ exclusive, } }