summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorMartin Polden <mpolden@mpolden.no>2020-04-28 19:43:36 +0200
committerMartin Polden <mpolden@mpolden.no>2020-04-29 11:54:42 +0200
commit9ef7e4bb4433f4baa510a0d2b083ec5aa38db93a (patch)
treeff45e610db2305fd973b70bd776628684473a859 /vespajlib
parent2fa59e9ab848e7813c1a2b9de302141eaba48066 (diff)
Extract maintenance package into vespajlib
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/pom.xml1
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java90
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java110
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java43
-rw-r--r--vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java138
-rw-r--r--vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java39
6 files changed, 421 insertions, 0 deletions
diff --git a/vespajlib/pom.xml b/vespajlib/pom.xml
index 7631a2af0fb..302a3c6f5bf 100644
--- a/vespajlib/pom.xml
+++ b/vespajlib/pom.xml
@@ -98,6 +98,7 @@
<configuration>
<compilerArgs>
<arg>-Xlint:all</arg>
+ <arg>-Xlint:-try</arg>
<arg>-Werror</arg>
</compilerArgs>
</configuration>
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java
new file mode 100644
index 00000000000..896443117c9
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java
@@ -0,0 +1,90 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent.maintenance;
+
+import com.yahoo.transaction.Mutex;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+/**
+ * Provides status and control over running maintenance jobs.
+ *
+ * This is multi-thread safe.
+ *
+ * @author bratseth
+ */
+public class JobControl {
+
+ /** This is not persisted as all nodes start all jobs */
+ private final Map<String, Maintainer> startedJobs = new ConcurrentSkipListMap<>();
+
+ /** Used to store deactivation in a persistent shared database to make changes take effect on all nodes */
+ private final Db db;
+
+ public JobControl(Db db) {
+ this.db = db;
+ }
+
+ /** Notifies this that a job was started */
+ public void started(String jobSimpleClassName, Maintainer maintainer) {
+ startedJobs.put(jobSimpleClassName, maintainer);
+ }
+
+ /**
+ * Returns a snapshot of the set of jobs started on this system (whether deactivated or not).
+ * Each job is represented by its simple (omitting package) class name.
+ */
+ public Set<String> jobs() { return Collections.unmodifiableSet(startedJobs.keySet()); }
+
+ /** Returns a snapshot containing the currently inactive jobs in this */
+ public Set<String> inactiveJobs() { return db.readInactiveJobs(); }
+
+ /** Returns true if this job is not currently deactivated */
+ public boolean isActive(String jobSimpleClassName) {
+ return ! inactiveJobs().contains(jobSimpleClassName);
+ }
+
+ /** Set a job active or inactive */
+ public void setActive(String jobSimpleClassName, boolean active) {
+ try (var lock = db.lockInactiveJobs()) {
+ Set<String> inactiveJobs = db.readInactiveJobs();
+ if (active)
+ inactiveJobs.remove(jobSimpleClassName);
+ else
+ inactiveJobs.add(jobSimpleClassName);
+ db.writeInactiveJobs(inactiveJobs);
+ }
+ }
+
+ /** Run given job (inactive or not) immediately */
+ public void run(String jobSimpleClassName) {
+ var job = startedJobs.get(jobSimpleClassName);
+ if (job == null) throw new IllegalArgumentException("No such job '" + jobSimpleClassName + "'");
+ job.lockAndMaintain();
+ }
+
+ /** Acquire lock for running given job */
+ public Mutex lockJob(String jobSimpleClassName) {
+ return db.lockMaintenanceJob(jobSimpleClassName);
+ }
+
+ /** The database used for managing job state and synchronization */
+ public interface Db {
+
+ /** Returns the set of jobs that are temporarily inactive */
+ Set<String> readInactiveJobs();
+
+ /** Make given jobs as inactive */
+ void writeInactiveJobs(Set<String> inactiveJobs);
+
+ /** Acquire lock for changing jobs */
+ Mutex lockInactiveJobs();
+
+ /** Acquire lock for running given job */
+ Mutex lockMaintenanceJob(String job);
+
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
new file mode 100644
index 00000000000..847be20e963
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/Maintainer.java
@@ -0,0 +1,110 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent.maintenance;
+
+import com.google.common.util.concurrent.UncheckedTimeoutException;
+import com.yahoo.net.HostName;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The base class for maintainers. A maintainer is some job which runs at a fixed rate to perform maintenance tasks.
+ *
+ * @author bratseth
+ * @author mpolden
+ */
+public abstract class Maintainer implements Runnable, AutoCloseable {
+
+ protected final Logger log = Logger.getLogger(this.getClass().getName());
+
+ private final String name;
+ private final JobControl jobControl;
+ private final Duration interval;
+ private final ScheduledExecutorService service;
+
+ public Maintainer(String name, Duration interval, Instant startedAt, JobControl jobControl, List<String> clusterHostnames) {
+ this(name, interval, staggeredDelay(interval, startedAt, HostName.getLocalhost(), clusterHostnames), jobControl);
+ }
+
+ public Maintainer(String name, Duration interval, Duration initialDelay, JobControl jobControl) {
+ this.name = name;
+ this.interval = requireInterval(interval);
+ this.jobControl = Objects.requireNonNull(jobControl);
+ service = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, name() + "-worker"));
+ service.scheduleAtFixedRate(this, initialDelay.toMillis(), interval.toMillis(), TimeUnit.MILLISECONDS);
+ jobControl.started(name(), this);
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (jobControl.isActive(name())) {
+ lockAndMaintain();
+ }
+ } catch (UncheckedTimeoutException ignored) {
+ // Another actor is running this job
+ } catch (Throwable e) {
+ log.log(Level.WARNING, this + " failed. Will retry in " + interval.toMinutes() + " minutes", e);
+ }
+ }
+
+ @Override
+ public void close() {
+ var timeout = Duration.ofSeconds(30);
+ service.shutdown();
+ try {
+ if (!service.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
+ log.log(Level.WARNING, "Maintainer " + name() + " failed to shutdown " +
+ "within " + timeout);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public final String toString() { return name(); }
+
+ /** Called once each time this maintenance job should run */
+ protected abstract void maintain();
+
+ /** Returns the interval at which this job is set to run */
+ protected Duration interval() { return interval; }
+
+ /** Run this while holding the job lock */
+ public final void lockAndMaintain() {
+ try (var lock = jobControl.lockJob(name())) {
+ maintain();
+ }
+ }
+
+ /** Returns the simple name of this job */
+ public final String name() {
+ return name == null ? this.getClass().getSimpleName() : name;
+ }
+
+ /** Returns the initial delay of this calculated from cluster index of given hostname */
+ static Duration staggeredDelay(Duration interval, Instant now, String hostname, List<String> clusterHostnames) {
+ Objects.requireNonNull(clusterHostnames);
+ if ( ! clusterHostnames.contains(hostname))
+ return interval;
+
+ long offset = clusterHostnames.indexOf(hostname) * interval.toMillis() / clusterHostnames.size();
+ return Duration.ofMillis(Math.floorMod(offset - now.toEpochMilli(), interval.toMillis()));
+ }
+
+ private static Duration requireInterval(Duration interval) {
+ Objects.requireNonNull(interval);
+ if (interval.isNegative() || interval.isZero())
+ throw new IllegalArgumentException("Interval must be positive, but was " + interval);
+ return interval;
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java
new file mode 100644
index 00000000000..9612b5736d3
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java
@@ -0,0 +1,43 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent.maintenance;
+
+import com.yahoo.slime.ArrayTraverser;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Serialization of a set of strings to/from Json bytes using Slime.
+ *
+ * The set is serialized as an array of string.
+ *
+ * @author bratseth
+ */
+public class StringSetSerializer {
+
+ public byte[] toJson(Set<String> stringSet) {
+ try {
+ Slime slime = new Slime();
+ Cursor array = slime.setArray();
+ for (String element : stringSet)
+ array.addString(element);
+ return SlimeUtils.toJsonBytes(slime);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization of a string set failed", e);
+ }
+
+ }
+
+ public Set<String> fromJson(byte[] data) {
+ Inspector inspector = SlimeUtils.jsonToSlime(data).get();
+ Set<String> stringSet = new HashSet<>();
+ inspector.traverse((ArrayTraverser) (index, name) -> stringSet.add(name.asString()));
+ return stringSet;
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java
new file mode 100644
index 00000000000..3600008b112
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java
@@ -0,0 +1,138 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent.maintenance;
+
+import com.yahoo.transaction.Mutex;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author bratseth
+ */
+public class JobControlTest {
+
+ @Test
+ public void testJobControl() {
+ JobControl jobControl = new JobControl(new MockDb());
+
+ MockMaintainer maintainer1 = new MockMaintainer();
+ MockMaintainer maintainer2 = new MockMaintainer();
+ assertTrue(jobControl.jobs().isEmpty());
+
+ String job1 = "Job1";
+ String job2 = "Job2";
+
+ jobControl.started(job1, maintainer1);
+ jobControl.started(job2, maintainer2);
+ assertEquals(2, jobControl.jobs().size());
+ assertTrue(jobControl.jobs().contains(job1));
+ assertTrue(jobControl.jobs().contains(job2));
+
+ assertTrue(jobControl.isActive(job1));
+ assertTrue(jobControl.isActive(job2));
+
+ jobControl.setActive(job1, false);
+ assertFalse(jobControl.isActive(job1));
+ assertTrue(jobControl.isActive(job2));
+
+ jobControl.setActive(job2, false);
+ assertFalse(jobControl.isActive(job1));
+ assertFalse(jobControl.isActive(job2));
+
+ jobControl.setActive(job1, true);
+ assertTrue(jobControl.isActive(job1));
+ assertFalse(jobControl.isActive(job2));
+
+ jobControl.setActive(job2, true);
+ assertTrue(jobControl.isActive(job1));
+ assertTrue(jobControl.isActive(job2));
+
+ // Run jobs on-demand
+ jobControl.run(job1);
+ jobControl.run(job1);
+ assertEquals(2, maintainer1.maintenanceInvocations);
+ jobControl.run(job2);
+ assertEquals(1, maintainer2.maintenanceInvocations);
+
+ // Running jobs on-demand ignores inactive flag
+ jobControl.setActive(job1, false);
+ jobControl.run(job1);
+ assertEquals(3, maintainer1.maintenanceInvocations);
+ }
+
+ @Test
+ public void testJobControlMayDeactivateJobs() {
+ JobControl jobControl = new JobControl(new MockDb());
+ MockMaintainer mockMaintainer = new MockMaintainer(jobControl);
+
+ assertTrue(jobControl.jobs().contains("MockMaintainer"));
+
+ assertEquals(0, mockMaintainer.maintenanceInvocations);
+
+ mockMaintainer.run();
+ assertEquals(1, mockMaintainer.maintenanceInvocations);
+
+ jobControl.setActive("MockMaintainer", false);
+ mockMaintainer.run();
+ assertEquals(1, mockMaintainer.maintenanceInvocations);
+
+ jobControl.setActive("MockMaintainer", true);
+ mockMaintainer.run();
+ assertEquals(2, mockMaintainer.maintenanceInvocations);
+ }
+
+ private static class MockDb implements JobControl.Db {
+
+ private final Set<String> inactiveJobs = new HashSet<>();
+
+ @Override
+ public Set<String> readInactiveJobs() {
+ return new HashSet<>(inactiveJobs);
+ }
+
+ @Override
+ public void writeInactiveJobs(Set<String> inactiveJobs) {
+ this.inactiveJobs.clear();
+ this.inactiveJobs.addAll(inactiveJobs);
+ }
+
+ @Override
+ public Mutex lockInactiveJobs() {
+ return () -> {};
+ }
+
+ @Override
+ public Mutex lockMaintenanceJob(String job) {
+ return () -> {};
+ }
+
+ }
+
+ private static class MockMaintainer extends Maintainer {
+
+ int maintenanceInvocations = 0;
+
+ private MockMaintainer(JobControl jobControl) {
+ super(null, Duration.ofHours(1), Instant.now(), jobControl, List.of());
+ }
+
+ private MockMaintainer() {
+ this(new JobControl(new MockDb()));
+ }
+
+ @Override
+ protected void maintain() {
+ maintenanceInvocations++;
+ }
+
+ }
+
+}
diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java
new file mode 100644
index 00000000000..820d1fc3d1d
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/MaintainerTest.java
@@ -0,0 +1,39 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent.maintenance;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author freva
+ */
+public class MaintainerTest {
+
+ @Test
+ public void staggering() {
+ List<String> cluster = List.of("cfg1", "cfg2", "cfg3");
+ Duration interval = Duration.ofMillis(300);
+ Instant now = Instant.ofEpochMilli(1000);
+ assertEquals(200, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis());
+ assertEquals(0, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis());
+ assertEquals(100, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis());
+
+ now = Instant.ofEpochMilli(1001);
+ assertEquals(199, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis());
+ assertEquals(299, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis());
+ assertEquals(99, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis());
+
+ now = Instant.ofEpochMilli(1101);
+ assertEquals(99, Maintainer.staggeredDelay(interval, now, "cfg1", cluster).toMillis());
+ assertEquals(199, Maintainer.staggeredDelay(interval, now, "cfg2", cluster).toMillis());
+ assertEquals(299, Maintainer.staggeredDelay(interval, now, "cfg3", cluster).toMillis());
+
+ assertEquals(300, Maintainer.staggeredDelay(interval, now, "cfg0", cluster).toMillis());
+ }
+
+}