diff options
author | Martin Polden <mpolden@mpolden.no> | 2020-04-28 19:43:36 +0200 |
---|---|---|
committer | Martin Polden <mpolden@mpolden.no> | 2020-04-29 11:54:42 +0200 |
commit | 9ef7e4bb4433f4baa510a0d2b083ec5aa38db93a (patch) | |
tree | ff45e610db2305fd973b70bd776628684473a859 /vespajlib | |
parent | 2fa59e9ab848e7813c1a2b9de302141eaba48066 (diff) |
Extract maintenance package into vespajlib
Diffstat (limited to 'vespajlib')
6 files changed, 421 insertions, 0 deletions
diff --git a/vespajlib/pom.xml b/vespajlib/pom.xml index 7631a2af0fb..302a3c6f5bf 100644 --- a/vespajlib/pom.xml +++ b/vespajlib/pom.xml @@ -98,6 +98,7 @@ <configuration> <compilerArgs> <arg>-Xlint:all</arg> + <arg>-Xlint:-try</arg> <arg>-Werror</arg> </compilerArgs> </configuration> diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java new file mode 100644 index 00000000000..896443117c9 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java @@ -0,0 +1,90 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import com.yahoo.transaction.Mutex; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; + +/** + * Provides status and control over running maintenance jobs. + * + * This is multi-thread safe. + * + * @author bratseth + */ +public class JobControl { + + /** This is not persisted as all nodes start all jobs */ + private final Map<String, Maintainer> startedJobs = new ConcurrentSkipListMap<>(); + + /** Used to store deactivation in a persistent shared database to make changes take effect on all nodes */ + private final Db db; + + public JobControl(Db db) { + this.db = db; + } + + /** Notifies this that a job was started */ + public void started(String jobSimpleClassName, Maintainer maintainer) { + startedJobs.put(jobSimpleClassName, maintainer); + } + + /** + * Returns a snapshot of the set of jobs started on this system (whether deactivated or not). + * Each job is represented by its simple (omitting package) class name. + */ + public Set<String> jobs() { return Collections.unmodifiableSet(startedJobs.keySet()); } + + /** Returns a snapshot containing the currently inactive jobs in this */ + public Set<String> inactiveJobs() { return db.readInactiveJobs(); } + + /** Returns true if this job is not currently deactivated */ + public boolean isActive(String jobSimpleClassName) { + return ! inactiveJobs().contains(jobSimpleClassName); + } + + /** Set a job active or inactive */ + public void setActive(String jobSimpleClassName, boolean active) { + try (var lock = db.lockInactiveJobs()) { + Set<String> inactiveJobs = db.readInactiveJobs(); + if (active) + inactiveJobs.remove(jobSimpleClassName); + else + inactiveJobs.add(jobSimpleClassName); + db.writeInactiveJobs(inactiveJobs); + } + } + + /** Run given job (inactive or not) immediately */ + public void run(String jobSimpleClassName) { + var job = startedJobs.get(jobSimpleClassName); + if (job == null) throw new IllegalArgumentException("No such job '" + jobSimpleClassName + "'"); + job.lockAndMaintain(); + } + + /** Acquire lock for running given job */ + public Mutex lockJob(String jobSimpleClassName) { + return db.lockMaintenanceJob(jobSimpleClassName); + } + + /** The database used for managing job state and synchronization */ + public interface Db { + + /** Returns the set of jobs that are temporarily inactive */ + Set<String> readInactiveJobs(); + + /** Make given jobs as inactive */ + void writeInactiveJobs(Set<String> inactiveJobs); + + /** Acquire lock for changing jobs */ + Mutex lockInactiveJobs(); + + /** Acquire lock for running given job */ + Mutex lockMaintenanceJob(String job); + + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java new file mode 100644 index 00000000000..847be20e963 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java @@ -0,0 +1,110 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import com.google.common.util.concurrent.UncheckedTimeoutException; +import com.yahoo.net.HostName; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The base class for maintainers. A maintainer is some job which runs at a fixed rate to perform maintenance tasks. + * + * @author bratseth + * @author mpolden + */ +public abstract class Maintainer implements Runnable, AutoCloseable { + + protected final Logger log = Logger.getLogger(this.getClass().getName()); + + private final String name; + private final JobControl jobControl; + private final Duration interval; + private final ScheduledExecutorService service; + + public Maintainer(String name, Duration interval, Instant startedAt, JobControl jobControl, List<String> clusterHostnames) { + this(name, interval, staggeredDelay(interval, startedAt, HostName.getLocalhost(), clusterHostnames), jobControl); + } + + public Maintainer(String name, Duration interval, Duration initialDelay, JobControl jobControl) { + this.name = name; + this.interval = requireInterval(interval); + this.jobControl = Objects.requireNonNull(jobControl); + service = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, name() + "-worker")); + service.scheduleAtFixedRate(this, initialDelay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS); + jobControl.started(name(), this); + } + + @Override + public void run() { + try { + if (jobControl.isActive(name())) { + lockAndMaintain(); + } + } catch (UncheckedTimeoutException ignored) { + // Another actor is running this job + } catch (Throwable e) { + log.log(Level.WARNING, this + " failed. Will retry in " + interval.toMinutes() + " minutes", e); + } + } + + @Override + public void close() { + var timeout = Duration.ofSeconds(30); + service.shutdown(); + try { + if (!service.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + log.log(Level.WARNING, "Maintainer " + name() + " failed to shutdown " + + "within " + timeout); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public final String toString() { return name(); } + + /** Called once each time this maintenance job should run */ + protected abstract void maintain(); + + /** Returns the interval at which this job is set to run */ + protected Duration interval() { return interval; } + + /** Run this while holding the job lock */ + public final void lockAndMaintain() { + try (var lock = jobControl.lockJob(name())) { + maintain(); + } + } + + /** Returns the simple name of this job */ + public final String name() { + return name == null ? this.getClass().getSimpleName() : name; + } + + /** Returns the initial delay of this calculated from cluster index of given hostname */ + static Duration staggeredDelay(Duration interval, Instant now, String hostname, List<String> clusterHostnames) { + Objects.requireNonNull(clusterHostnames); + if ( ! clusterHostnames.contains(hostname)) + return interval; + + long offset = clusterHostnames.indexOf(hostname) * interval.toMillis() / clusterHostnames.size(); + return Duration.ofMillis(Math.floorMod(offset - now.toEpochMilli(), interval.toMillis())); + } + + private static Duration requireInterval(Duration interval) { + Objects.requireNonNull(interval); + if (interval.isNegative() || interval.isZero()) + throw new IllegalArgumentException("Interval must be positive, but was " + interval); + return interval; + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java new file mode 100644 index 00000000000..9612b5736d3 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java @@ -0,0 +1,43 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import com.yahoo.slime.ArrayTraverser; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Serialization of a set of strings to/from Json bytes using Slime. + * + * The set is serialized as an array of string. + * + * @author bratseth + */ +public class StringSetSerializer { + + public byte[] toJson(Set<String> stringSet) { + try { + Slime slime = new Slime(); + Cursor array = slime.setArray(); + for (String element : stringSet) + array.addString(element); + return SlimeUtils.toJsonBytes(slime); + } catch (IOException e) { + throw new RuntimeException("Serialization of a string set failed", e); + } + + } + + public Set<String> fromJson(byte[] data) { + Inspector inspector = SlimeUtils.jsonToSlime(data).get(); + Set<String> stringSet = new HashSet<>(); + inspector.traverse((ArrayTraverser) (index, name) -> stringSet.add(name.asString())); + return stringSet; + } + +} diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java new file mode 100644 index 00000000000..3600008b112 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java @@ -0,0 +1,138 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import com.yahoo.transaction.Mutex; +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author bratseth + */ +public class JobControlTest { + + @Test + public void testJobControl() { + JobControl jobControl = new JobControl(new MockDb()); + + MockMaintainer maintainer1 = new MockMaintainer(); + MockMaintainer maintainer2 = new MockMaintainer(); + assertTrue(jobControl.jobs().isEmpty()); + + String job1 = "Job1"; + String job2 = "Job2"; + + jobControl.started(job1, maintainer1); + jobControl.started(job2, maintainer2); + assertEquals(2, jobControl.jobs().size()); + assertTrue(jobControl.jobs().contains(job1)); + assertTrue(jobControl.jobs().contains(job2)); + + assertTrue(jobControl.isActive(job1)); + assertTrue(jobControl.isActive(job2)); + + jobControl.setActive(job1, false); + assertFalse(jobControl.isActive(job1)); + assertTrue(jobControl.isActive(job2)); + + jobControl.setActive(job2, false); + assertFalse(jobControl.isActive(job1)); + assertFalse(jobControl.isActive(job2)); + + jobControl.setActive(job1, true); + assertTrue(jobControl.isActive(job1)); + assertFalse(jobControl.isActive(job2)); + + jobControl.setActive(job2, true); + assertTrue(jobControl.isActive(job1)); + assertTrue(jobControl.isActive(job2)); + + // Run jobs on-demand + jobControl.run(job1); + jobControl.run(job1); + assertEquals(2, maintainer1.maintenanceInvocations); + jobControl.run(job2); + assertEquals(1, maintainer2.maintenanceInvocations); + + // Running jobs on-demand ignores inactive flag + jobControl.setActive(job1, false); + jobControl.run(job1); + assertEquals(3, maintainer1.maintenanceInvocations); + } + + @Test + public void testJobControlMayDeactivateJobs() { + JobControl jobControl = new JobControl(new MockDb()); + MockMaintainer mockMaintainer = new MockMaintainer(jobControl); + + assertTrue(jobControl.jobs().contains("MockMaintainer")); + + assertEquals(0, mockMaintainer.maintenanceInvocations); + + mockMaintainer.run(); + assertEquals(1, mockMaintainer.maintenanceInvocations); + + jobControl.setActive("MockMaintainer", false); + mockMaintainer.run(); + assertEquals(1, mockMaintainer.maintenanceInvocations); + + jobControl.setActive("MockMaintainer", true); + mockMaintainer.run(); + assertEquals(2, mockMaintainer.maintenanceInvocations); + } + + private static class MockDb implements JobControl.Db { + + private final Set<String> inactiveJobs = new HashSet<>(); + + @Override + public Set<String> readInactiveJobs() { + return new HashSet<>(inactiveJobs); + } + + @Override + public void writeInactiveJobs(Set<String> inactiveJobs) { + this.inactiveJobs.clear(); + this.inactiveJobs.addAll(inactiveJobs); + } + + @Override + public Mutex lockInactiveJobs() { + return () -> {}; + } + + @Override + public Mutex lockMaintenanceJob(String job) { + return () -> {}; + } + + } + + private static class MockMaintainer extends Maintainer { + + int maintenanceInvocations = 0; + + private MockMaintainer(JobControl jobControl) { + super(null, Duration.ofHours(1), Instant.now(), jobControl, List.of()); + } + + private MockMaintainer() { + this(new JobControl(new MockDb())); + } + + @Override + protected void maintain() { + maintenanceInvocations++; + } + + } + +} diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java new file mode 100644 index 00000000000..820d1fc3d1d --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java @@ -0,0 +1,39 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import org.junit.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * @author freva + */ +public class MaintainerTest { + + @Test + public void staggering() { + List<String> cluster = List.of("cfg1", "cfg2", "cfg3"); + Duration interval = Duration.ofMillis(300); + Instant now = Instant.ofEpochMilli(1000); + assertEquals(200, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis()); + assertEquals(0, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis()); + assertEquals(100, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis()); + + now = Instant.ofEpochMilli(1001); + assertEquals(199, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis()); + assertEquals(299, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis()); + assertEquals(99, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis()); + + now = Instant.ofEpochMilli(1101); + assertEquals(99, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis()); + assertEquals(199, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis()); + assertEquals(299, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis()); + + assertEquals(300, Maintainer.staggeredDelay(interval, now, "cfg0", cluster).toMillis()); + } + +} |