From ae0051eb76a2600bdcce5a119de06f367be7d164 Mon Sep 17 00:00:00 2001 From: Martin Polden Date: Thu, 9 Jul 2020 11:07:32 +0200 Subject: Control maintenance jobs with feature flag --- .../yahoo/concurrent/maintenance/JobControl.java | 44 ++++------------------ .../concurrent/maintenance/JobControlState.java | 21 +++++++++++ .../maintenance/StringSetSerializer.java | 43 --------------------- .../concurrent/maintenance/JobControlTest.java | 41 ++++++++++---------- 4 files changed, 49 insertions(+), 100 deletions(-) create mode 100644 vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControlState.java delete mode 100644 vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java (limited to 'vespajlib') diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java index 896443117c9..583337203ab 100644 --- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControl.java @@ -5,11 +5,12 @@ import com.yahoo.transaction.Mutex; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; /** - * Provides status and control over running maintenance jobs. + * Provides status over running maintenance jobs. * * This is multi-thread safe. * @@ -20,11 +21,11 @@ public class JobControl { /** This is not persisted as all nodes start all jobs */ private final Map startedJobs = new ConcurrentSkipListMap<>(); - /** Used to store deactivation in a persistent shared database to make changes take effect on all nodes */ - private final Db db; + /** Used for managing shared persistent state, to make changes take effect on all nodes */ + private final JobControlState state; - public JobControl(Db db) { - this.db = db; + public JobControl(JobControlState state) { + this.state = Objects.requireNonNull(state); } /** Notifies this that a job was started */ @@ -39,25 +40,13 @@ public class JobControl { public Set jobs() { return Collections.unmodifiableSet(startedJobs.keySet()); } /** Returns a snapshot containing the currently inactive jobs in this */ - public Set inactiveJobs() { return db.readInactiveJobs(); } + public Set inactiveJobs() { return state.readInactiveJobs(); } /** Returns true if this job is not currently deactivated */ public boolean isActive(String jobSimpleClassName) { return ! inactiveJobs().contains(jobSimpleClassName); } - /** Set a job active or inactive */ - public void setActive(String jobSimpleClassName, boolean active) { - try (var lock = db.lockInactiveJobs()) { - Set inactiveJobs = db.readInactiveJobs(); - if (active) - inactiveJobs.remove(jobSimpleClassName); - else - inactiveJobs.add(jobSimpleClassName); - db.writeInactiveJobs(inactiveJobs); - } - } - /** Run given job (inactive or not) immediately */ public void run(String jobSimpleClassName) { var job = startedJobs.get(jobSimpleClassName); @@ -67,24 +56,7 @@ public class JobControl { /** Acquire lock for running given job */ public Mutex lockJob(String jobSimpleClassName) { - return db.lockMaintenanceJob(jobSimpleClassName); - } - - /** The database used for managing job state and synchronization */ - public interface Db { - - /** Returns the set of jobs that are temporarily inactive */ - Set readInactiveJobs(); - - /** Make given jobs as inactive */ - void writeInactiveJobs(Set inactiveJobs); - - /** Acquire lock for changing jobs */ - Mutex lockInactiveJobs(); - - /** Acquire lock for running given job */ - Mutex lockMaintenanceJob(String job); - + return state.lockMaintenanceJob(jobSimpleClassName); } } diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControlState.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControlState.java new file mode 100644 index 00000000000..f5bfeee8d0e --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobControlState.java @@ -0,0 +1,21 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent.maintenance; + +import com.yahoo.transaction.Mutex; + +import java.util.Set; + +/** + * Interface for managing job state and synchronization + * + * @author mpolden + */ +public interface JobControlState { + + /** Returns the set of jobs that are temporarily inactive */ + Set readInactiveJobs(); + + /** Acquire lock for running given job */ + Mutex lockMaintenanceJob(String job); + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java deleted file mode 100644 index 9612b5736d3..00000000000 --- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/StringSetSerializer.java +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.concurrent.maintenance; - -import com.yahoo.slime.ArrayTraverser; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Inspector; -import com.yahoo.slime.Slime; -import com.yahoo.slime.SlimeUtils; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; - -/** - * Serialization of a set of strings to/from Json bytes using Slime. - * - * The set is serialized as an array of string. - * - * @author bratseth - */ -public class StringSetSerializer { - - public byte[] toJson(Set stringSet) { - try { - Slime slime = new Slime(); - Cursor array = slime.setArray(); - for (String element : stringSet) - array.addString(element); - return SlimeUtils.toJsonBytes(slime); - } catch (IOException e) { - throw new RuntimeException("Serialization of a string set failed", e); - } - - } - - public Set fromJson(byte[] data) { - Inspector inspector = SlimeUtils.jsonToSlime(data).get(); - Set stringSet = new HashSet<>(); - inspector.traverse((ArrayTraverser) (index, name) -> stringSet.add(name.asString())); - return stringSet; - } - -} diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java index 3600008b112..0640ab2835a 100644 --- a/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java +++ b/vespajlib/src/test/java/com/yahoo/concurrent/maintenance/JobControlTest.java @@ -21,7 +21,8 @@ public class JobControlTest { @Test public void testJobControl() { - JobControl jobControl = new JobControl(new MockDb()); + MockJobControlState state = new MockJobControlState(); + JobControl jobControl = new JobControl(state); MockMaintainer maintainer1 = new MockMaintainer(); MockMaintainer maintainer2 = new MockMaintainer(); @@ -39,19 +40,19 @@ public class JobControlTest { assertTrue(jobControl.isActive(job1)); assertTrue(jobControl.isActive(job2)); - jobControl.setActive(job1, false); + state.setActive(job1, false); assertFalse(jobControl.isActive(job1)); assertTrue(jobControl.isActive(job2)); - jobControl.setActive(job2, false); + state.setActive(job2, false); assertFalse(jobControl.isActive(job1)); assertFalse(jobControl.isActive(job2)); - jobControl.setActive(job1, true); + state.setActive(job1, true); assertTrue(jobControl.isActive(job1)); assertFalse(jobControl.isActive(job2)); - jobControl.setActive(job2, true); + state.setActive(job2, true); assertTrue(jobControl.isActive(job1)); assertTrue(jobControl.isActive(job2)); @@ -63,14 +64,15 @@ public class JobControlTest { assertEquals(1, maintainer2.maintenanceInvocations); // Running jobs on-demand ignores inactive flag - jobControl.setActive(job1, false); + state.setActive(job1, false); jobControl.run(job1); assertEquals(3, maintainer1.maintenanceInvocations); } @Test public void testJobControlMayDeactivateJobs() { - JobControl jobControl = new JobControl(new MockDb()); + MockJobControlState state = new MockJobControlState(); + JobControl jobControl = new JobControl(state); MockMaintainer mockMaintainer = new MockMaintainer(jobControl); assertTrue(jobControl.jobs().contains("MockMaintainer")); @@ -80,16 +82,16 @@ public class JobControlTest { mockMaintainer.run(); assertEquals(1, mockMaintainer.maintenanceInvocations); - jobControl.setActive("MockMaintainer", false); + state.setActive("MockMaintainer", false); mockMaintainer.run(); assertEquals(1, mockMaintainer.maintenanceInvocations); - jobControl.setActive("MockMaintainer", true); + state.setActive("MockMaintainer", true); mockMaintainer.run(); assertEquals(2, mockMaintainer.maintenanceInvocations); } - private static class MockDb implements JobControl.Db { + private static class MockJobControlState implements JobControlState { private final Set inactiveJobs = new HashSet<>(); @@ -99,19 +101,16 @@ public class JobControlTest { } @Override - public void writeInactiveJobs(Set inactiveJobs) { - this.inactiveJobs.clear(); - this.inactiveJobs.addAll(inactiveJobs); - } - - @Override - public Mutex lockInactiveJobs() { + public Mutex lockMaintenanceJob(String job) { return () -> {}; } - @Override - public Mutex lockMaintenanceJob(String job) { - return () -> {}; + public void setActive(String job, boolean active) { + if (active) { + inactiveJobs.remove(job); + } else { + inactiveJobs.add(job); + } } } @@ -125,7 +124,7 @@ public class JobControlTest { } private MockMaintainer() { - this(new JobControl(new MockDb())); + this(new JobControl(new MockJobControlState())); } @Override -- cgit v1.2.3