diff options
author | jonmv <venstad@gmail.com> | 2022-10-12 10:51:21 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2022-10-12 10:51:21 +0200 |
commit | 18b9e3ac1120513f934263a3ca131d6e9e85e7d6 (patch) | |
tree | 0e3b35a095194ae8d02f295f117ec1aa67f2f3a2 /zkfacade/src | |
parent | 1c6192ff71f635a25b069c7761948105f1ae6dae (diff) |
Kill AbstractSingletonWorker, and add sample code
Diffstat (limited to 'zkfacade/src')
4 files changed, 143 insertions, 27 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java index 832755de6eb..27d969c0c09 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.curator; import com.yahoo.component.AbstractComponent; import com.yahoo.component.annotation.Inject; +import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.jdisc.Metric; import com.yahoo.path.Path; import com.yahoo.vespa.curator.api.VespaCurator; @@ -16,7 +17,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Implementation of {@link VespaCurator} which delegates to a {@link Curator}. @@ -105,13 +108,41 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator { } @Override - public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) { - return singletons.register(singletonId, singleton); + public void register(SingletonWorker singleton, Duration timeout) { + try { + await(singletons.register(singleton.id(), singleton), timeout, "register " + singleton); + } + catch (RuntimeException e) { + try { + unregister(singleton, timeout); + } + catch (Exception f) { + e.addSuppressed(f); + } + throw e; + } } @Override - public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) { - return singletons.unregister(singleton); + public void unregister(SingletonWorker singleton, Duration timeout) { + await(singletons.unregister(singleton), timeout, "unregister " + singleton); + } + + private void await(Future<?> future, Duration timeout, String action) { + try { + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + future.cancel(true); + throw new UncheckedInterruptedException("interrupted while " + action, e, true); + } + catch (TimeoutException e) { + future.cancel(true); + throw new UncheckedTimeoutException("timed out while " + action, e); + } + catch (ExecutionException e) { + throw new RuntimeException("failed to " + action, e.getCause()); + } } @Override diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java index 3186c56b002..d2fe6ac6625 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java @@ -140,7 +140,7 @@ class SingletonManager { Janitor(String id) { this.id = id; this.path = Path.fromString("/vespa/singleton/v1/" + id); - this.worker = new Thread(this::run, "singleton-janitor-" + id + "-"); + this.worker = new Thread(this::run, "singleton-janitor-" + id); this.metrics = new MetricHelper(); worker.setDaemon(true); @@ -199,7 +199,7 @@ class SingletonManager { } } catch (RuntimeException e) { - logger.log(Level.WARNING, "Uncaught exception in " + worker, e); + logger.log(Level.WARNING, "Exception attempting to " + task.type + " " + task.singleton + " in " + worker, e); task.future.completeExceptionally(e); } } @@ -374,6 +374,7 @@ class SingletonManager { Instant start = clock.instant(); boolean failed = false; metric.add(ACTIVATION, 1, context); + logger.log(Level.INFO, "Activating singleton with ID " + id); try { activation.run(); } @@ -392,6 +393,7 @@ class SingletonManager { Instant start = clock.instant(); boolean failed = false; metric.add(DEACTIVATION, 1, context); + logger.log(Level.INFO, "Deactivating singleton with ID " + id); try { deactivation.run(); } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java index 6ac44631ed2..f2bc38a4644 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java @@ -7,6 +7,7 @@ import com.yahoo.path.Path; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -63,20 +64,29 @@ public interface VespaCurator { record Meta(int version) { } /** - * Register the singleton with the framework, so it may become active, and returns a - * synchronisation handle to any deactivations or activations triggered by this. - * If there is already another active singleton with the given ID (in this JVM), - * that will be deactivated before the new one is activated. + * Register the singleton with the framework, so it may become active. + * <p> + * <strong>Call this after construction of the singleton, typically during component construction!</strong> + * <ul> + * <li>If this activates the singleton, this happens synchronously, and any errors are propagated here.</li> + * <li>If this replaces an already active singleton, its deactivation is also called, prior to activation of this.</li> + * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li> + * <li>If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.</li> + * </ul> */ - Future<?> registerSingleton(String singletonId, SingletonWorker singleton); + void register(SingletonWorker singleton, Duration timeout); /** * Unregister with the framework, so this singleton may no longer be active, and returns - * a synchronisation handle to any deactivation or activation triggered by this. - * If this is the last singleton registered with its ID, then this container immediately releases - * the activation lease for that ID, so another container may acquire it. + * <p> + * <strong>Call this before deconstruction of the singleton, typically during component deconstruction!</strong> + * <ul> + * <li>If this singleton is active, deactivation will be called synchronously, and errors propagated here.</li> + * <li>If this also triggers activation of a new singleton, its activation is called after deactivation of this.</li> + * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li> + * </ul> */ - Future<?> unregisterSingleton(SingletonWorker singleton); + void unregister(SingletonWorker singleton, Duration timeout); /** * Whether this container currently holds the exclusive lease for activation of singletons with this ID. @@ -85,18 +95,79 @@ public interface VespaCurator { /** * Callback interface for processes of which only a single instance should be active at any time, across all - * containers in the cluster, and across all component generations. Notes: + * containers in the cluster, and across all component generations. + * <p> + * <br> + * Sample usage: + * <pre> + * public class SingletonHolder extends AbstractComponent { + * + * private static final Duration timeout = Duration.ofSeconds(10); + * private final VespaCurator curator; + * private final SingletonWorker singleton; + * + * public SingletonHolder(VespaCurator curator) { + * this.curator = curator; + * this.singleton = new Singleton(); + * curator.register(singleton, timeout); + * } + * + * @Override + * public void deconstruct() { + * curator.unregister(singleton, timeout); + * singleton.shutdown(); + * } + * + * } + * + * public class Singleton implements SingletonWorker { + * + * private final SharedResource resource = ...; // Shared resource that requires exclusive access. + * private final ExecutorService executor = Executors.newSingleThreadExecutor(); + * private final AtomicBoolean running = new AtomicBoolean(); + * private Future<?> future = null; + * + * @Override + * public void activate() { + * resource.open(); // Verify resource works here, and propagate any errors out. + * running.set(true); + * future = executor.submit(this::doWork); + * } + * + * @Override + * public void deactivate() { + * running.set(false); + * try { future.get(10, TimeUnit.SECONDS); } + * catch (Exception e) { ... } + * finally { resource.close(); } + * } + * + * private void doWork() { + * while (running.get()) { ... } // Regularly check whether we should keep running. + * } + * + * public void shutdown() { + * executor.shutdownNow(); // Executor should have no running tasks at this point. + * } + * + * } + * </pre> + * <p> + * <br> + * Notes to implementors: * <ul> * <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered * singleton in this container, and this container has the lease for the ID with which the singleton - * was registered. See {@link #registerSingleton} and {@link #isActive}.</li> + * was registered. See {@link #id}, {@link #register} and {@link #isActive}.</li> * <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever - * the above no longer holds. See {@link #unregisterSingleton}.</li> - * <li>Callbacks for the same ID are always invoked by the same thread, in serial; - * the callbacks must return in a timely manner, but are allowed to throw exceptions.</li> + * the above no longer holds. See {@link #unregister}.</li> + * <li>Callbacks for the same ID are always invoked by the same thread, in serial; the callbacks must + * return in a timely manner, but are encouraged to throw exceptions when something's wrong</li> * <li>Activation and deactivation may be triggered by: - * <ol><li>the container acquiring or losing the activation lease; or</li> - * <li>registration of unregistration of a new or obsolete singleton.</li></ol> + * <ol> + * <li>the container acquiring or losing the activation lease; or</li> + * <li>registration of unregistration of a new or obsolete singleton.</li> + * </ol> * Events triggered by the latter happen synchronously, and errors are propagated to the caller for cleanup. * Events triggered by the former may happen in the background, and because the system tries to always have * one activated singleton, exceptions during activation will cause the container to abandon its lease, so @@ -104,7 +175,6 @@ public interface VespaCurator { * </li> * <li>A container without any registered singletons will not attempt to hold the activation lease.</li> * </ul> - * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations. */ interface SingletonWorker { @@ -118,6 +188,14 @@ public interface VespaCurator { /** Called by the system whenever this singleton is no longer the single active worker. */ void deactivate(); + /** + * The singleton ID to use when registering this with a {@link VespaCurator}. + * At most one singleton worker with the given ID will be active, in the cluster, at any time. + * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently + * allowed to have an active singleton with the given ID. + */ + default String id() { return getClass().getName(); } + } } diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java index 030104b82f0..d0fef86946c 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java @@ -3,9 +3,9 @@ package com.yahoo.vespa.curator; import com.yahoo.jdisc.test.MockMetric; import com.yahoo.path.Path; import com.yahoo.test.ManualClock; -import com.yahoo.vespa.curator.api.AbstractSingletonWorker; import com.yahoo.vespa.curator.api.VespaCurator; import com.yahoo.vespa.curator.api.VespaCurator.Meta; +import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.curator.mock.MockCuratorFramework; import org.apache.curator.framework.state.ConnectionState; @@ -314,8 +314,13 @@ public class CuratorWrapperTest { } } - static class Singleton extends AbstractSingletonWorker { - Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); } + static class Singleton implements SingletonWorker { + final VespaCurator curator; + Singleton(VespaCurator curator) { + this.curator = curator; + + curator.register(this, Duration.ofSeconds(2)); + } boolean isActive; Phaser phaser = new Phaser(1); @Override public String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ) @@ -329,7 +334,7 @@ public class CuratorWrapperTest { isActive = false; phaser.arriveAndAwaitAdvance(); } - public void shutdown() { unregister(Duration.ofSeconds(2)); } + public void shutdown() { curator.unregister(this, Duration.ofSeconds(2)); } } static void verifyMetrics(Map<String, Double> expected, MockMetric metrics) { |