diff options
author | Håkon Hallingstad <hakon.hallingstad@gmail.com> | 2022-10-12 11:16:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-12 11:16:44 +0200 |
commit | 1392e1638854131eb91412e31d52e71810be5e98 (patch) | |
tree | e9486d9e367c3dbaf2299e15e9bea468d34a801a | |
parent | cdf3aaddb6d301ae78c9865fa4152c298affa526 (diff) | |
parent | 18b9e3ac1120513f934263a3ca131d6e9e85e7d6 (diff) |
Merge pull request #24400 from vespa-engine/jonmv/singleton-log-and-sample-cod
Jonmv/singleton log and sample cod
6 files changed, 148 insertions, 152 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json index 12e165b6cc9..41a1854c276 100644 --- a/zkfacade/abi-spec.json +++ b/zkfacade/abi-spec.json @@ -1,21 +1,4 @@ { - "com.yahoo.vespa.curator.api.AbstractSingletonWorker": { - "superClass": "java.lang.Object", - "interfaces": [ - "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker" - ], - "attributes": [ - "public", - "abstract" - ], - "methods": [ - "public void <init>()", - "public java.lang.String id()", - "public final void register(com.yahoo.vespa.curator.api.VespaCurator, java.time.Duration)", - "public final void unregister(java.time.Duration)" - ], - "fields": [] - }, "com.yahoo.vespa.curator.api.VespaCurator$Data": { "superClass": "java.lang.Record", "interfaces": [], @@ -61,7 +44,8 @@ ], "methods": [ "public abstract void activate()", - "public abstract void deactivate()" + "public abstract void deactivate()", + "public java.lang.String id()" ], "fields": [] }, @@ -83,8 +67,8 @@ "public abstract boolean delete(com.yahoo.path.Path, int)", "public abstract java.util.List list(com.yahoo.path.Path)", "public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)", - "public abstract java.util.concurrent.Future registerSingleton(java.lang.String, com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)", - "public abstract java.util.concurrent.Future unregisterSingleton(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)", + "public abstract void register(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)", + "public abstract void unregister(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)", "public abstract boolean isActive(java.lang.String)" ], "fields": [] diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java index 2cba6ee6efe..27d969c0c09 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.curator; import com.yahoo.component.AbstractComponent; import com.yahoo.component.annotation.Inject; +import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.jdisc.Metric; import com.yahoo.path.Path; import com.yahoo.vespa.curator.api.VespaCurator; @@ -16,7 +17,9 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * Implementation of {@link VespaCurator} which delegates to a {@link Curator}. @@ -101,21 +104,45 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator { @Override public AutoCloseable lock(Path path, Duration timeout) { - // TODO jonmv: clear up - Lock current, old = curator.lock(path, timeout); - try { current = curator.lock(userRoot.append(path), timeout); } - catch (Throwable t) { old.close(); throw t; } - return () -> { try(old) { current.close(); } }; + return curator.lock(userRoot.append(path), timeout); } @Override - public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) { - return singletons.register(singletonId, singleton); + public void register(SingletonWorker singleton, Duration timeout) { + try { + await(singletons.register(singleton.id(), singleton), timeout, "register " + singleton); + } + catch (RuntimeException e) { + try { + unregister(singleton, timeout); + } + catch (Exception f) { + e.addSuppressed(f); + } + throw e; + } } @Override - public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) { - return singletons.unregister(singleton); + public void unregister(SingletonWorker singleton, Duration timeout) { + await(singletons.unregister(singleton), timeout, "unregister " + singleton); + } + + private void await(Future<?> future, Duration timeout, String action) { + try { + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + future.cancel(true); + throw new UncheckedInterruptedException("interrupted while " + action, e, true); + } + catch (TimeoutException e) { + future.cancel(true); + throw new UncheckedTimeoutException("timed out while " + action, e); + } + catch (ExecutionException e) { + throw new RuntimeException("failed to " + action, e.getCause()); + } } @Override diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java index 3186c56b002..d2fe6ac6625 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java @@ -140,7 +140,7 @@ class SingletonManager { Janitor(String id) { this.id = id; this.path = Path.fromString("/vespa/singleton/v1/" + id); - this.worker = new Thread(this::run, "singleton-janitor-" + id + "-"); + this.worker = new Thread(this::run, "singleton-janitor-" + id); this.metrics = new MetricHelper(); worker.setDaemon(true); @@ -199,7 +199,7 @@ class SingletonManager { } } catch (RuntimeException e) { - logger.log(Level.WARNING, "Uncaught exception in " + worker, e); + logger.log(Level.WARNING, "Exception attempting to " + task.type + " " + task.singleton + " in " + worker, e); task.future.completeExceptionally(e); } } @@ -374,6 +374,7 @@ class SingletonManager { Instant start = clock.instant(); boolean failed = false; metric.add(ACTIVATION, 1, context); + logger.log(Level.INFO, "Activating singleton with ID " + id); try { activation.run(); } @@ -392,6 +393,7 @@ class SingletonManager { Instant start = clock.instant(); boolean failed = false; metric.add(DEACTIVATION, 1, context); + logger.log(Level.INFO, "Deactivating singleton with ID " + id); try { deactivation.run(); } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java deleted file mode 100644 index dc0540e73c5..00000000000 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java +++ /dev/null @@ -1,95 +0,0 @@ -package com.yahoo.vespa.curator.api; - -import com.yahoo.component.AbstractComponent; -import com.yahoo.concurrent.UncheckedTimeoutException; -import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; -import com.yahoo.yolean.UncheckedInterruptedException; - -import java.time.Duration; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Super-class for implementations of {@link SingletonWorker}. - * Users should call {@link VespaCurator#registerSingleton} at construction, and - * {@link VespaCurator#unregisterSingleton} at deconstruction. - * If this fails activation on registration, it will immediately unregister as well, before propagating the error. - * Consequently, registering this on construction will allow detecting a failed component generation, and instead - * retain the previous generation, provided enough verification is done in {@link #activate()}. - * The default ID to use with registration is the concrete class name, e.g., {@code my.example.Singleton}. - * - * @author jonmv - */ -public abstract class AbstractSingletonWorker implements SingletonWorker { - - private final AtomicReference<VespaCurator> owner = new AtomicReference<>(); - - /** - * The singleton ID to use when registering this with a {@link VespaCurator}. - * At most one singleton worker with the given ID will be active, in the cluster, at any time. - * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently - * allowed to have an active singleton with the given ID. - */ - public String id() { return getClass().getName(); } - - /** - * <strong>Call this at the end of construction of the owner of this.</strong> - * If this activates the singleton, this happens synchronously, and any errors are propagated here. - * If this replaces an already active singleton, its deactivation is also called, prior to activation of this. - * If (de)activation is not complete within the given timeout, a timeout exception is thrown. - * If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout. - */ - public final void register(VespaCurator curator, Duration timeout) { - if ( ! owner.compareAndSet(null, curator)) { - throw new IllegalArgumentException(this + " is already registered with " + owner.get()); - } - try { - await(curator.registerSingleton(id(), this), timeout, "register"); - } - catch (RuntimeException e) { - try { - unregister(timeout); - } - catch (Exception f) { - e.addSuppressed(f); - } - throw e; - } - } - - /** - * <strong>Call this at the start of deconstruction of the owner of this!</strong> - * <p> - * If this singleton is active, deactivation will be called synchronously, and errors propagated here. - * If this also triggers activation of a new singleton, its activation is called after deactivation of this. - * If (de)activation is not complete within the given timeout, a timeout exception is thrown. - */ - public final void unregister(Duration timeout) { - VespaCurator curator = owner.getAndSet(null); - if (curator == null) { - throw new IllegalArgumentException(this + " was not registered with any owners"); - } - await(curator.unregisterSingleton(this), timeout, "unregister"); - } - - private void await(Future<?> future, Duration timeout, String verb) { - try { - future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { - future.cancel(true); - throw new UncheckedInterruptedException("interrupted while " + verb + "ing " + this, e, true); - } - catch (TimeoutException e) { - future.cancel(true); - throw new UncheckedTimeoutException("timed out while " + verb + "ing " + this, e); - } - catch (ExecutionException e) { - throw new RuntimeException("failed to " + verb + " " + this, e.getCause()); - } - } - -} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java index 6ac44631ed2..f2bc38a4644 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java @@ -7,6 +7,7 @@ import com.yahoo.path.Path; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; import java.util.concurrent.Future; /** @@ -63,20 +64,29 @@ public interface VespaCurator { record Meta(int version) { } /** - * Register the singleton with the framework, so it may become active, and returns a - * synchronisation handle to any deactivations or activations triggered by this. - * If there is already another active singleton with the given ID (in this JVM), - * that will be deactivated before the new one is activated. + * Register the singleton with the framework, so it may become active. + * <p> + * <strong>Call this after construction of the singleton, typically during component construction!</strong> + * <ul> + * <li>If this activates the singleton, this happens synchronously, and any errors are propagated here.</li> + * <li>If this replaces an already active singleton, its deactivation is also called, prior to activation of this.</li> + * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li> + * <li>If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.</li> + * </ul> */ - Future<?> registerSingleton(String singletonId, SingletonWorker singleton); + void register(SingletonWorker singleton, Duration timeout); /** * Unregister with the framework, so this singleton may no longer be active, and returns - * a synchronisation handle to any deactivation or activation triggered by this. - * If this is the last singleton registered with its ID, then this container immediately releases - * the activation lease for that ID, so another container may acquire it. + * <p> + * <strong>Call this before deconstruction of the singleton, typically during component deconstruction!</strong> + * <ul> + * <li>If this singleton is active, deactivation will be called synchronously, and errors propagated here.</li> + * <li>If this also triggers activation of a new singleton, its activation is called after deactivation of this.</li> + * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li> + * </ul> */ - Future<?> unregisterSingleton(SingletonWorker singleton); + void unregister(SingletonWorker singleton, Duration timeout); /** * Whether this container currently holds the exclusive lease for activation of singletons with this ID. @@ -85,18 +95,79 @@ public interface VespaCurator { /** * Callback interface for processes of which only a single instance should be active at any time, across all - * containers in the cluster, and across all component generations. Notes: + * containers in the cluster, and across all component generations. + * <p> + * <br> + * Sample usage: + * <pre> + * public class SingletonHolder extends AbstractComponent { + * + * private static final Duration timeout = Duration.ofSeconds(10); + * private final VespaCurator curator; + * private final SingletonWorker singleton; + * + * public SingletonHolder(VespaCurator curator) { + * this.curator = curator; + * this.singleton = new Singleton(); + * curator.register(singleton, timeout); + * } + * + * @Override + * public void deconstruct() { + * curator.unregister(singleton, timeout); + * singleton.shutdown(); + * } + * + * } + * + * public class Singleton implements SingletonWorker { + * + * private final SharedResource resource = ...; // Shared resource that requires exclusive access. + * private final ExecutorService executor = Executors.newSingleThreadExecutor(); + * private final AtomicBoolean running = new AtomicBoolean(); + * private Future<?> future = null; + * + * @Override + * public void activate() { + * resource.open(); // Verify resource works here, and propagate any errors out. + * running.set(true); + * future = executor.submit(this::doWork); + * } + * + * @Override + * public void deactivate() { + * running.set(false); + * try { future.get(10, TimeUnit.SECONDS); } + * catch (Exception e) { ... } + * finally { resource.close(); } + * } + * + * private void doWork() { + * while (running.get()) { ... } // Regularly check whether we should keep running. + * } + * + * public void shutdown() { + * executor.shutdownNow(); // Executor should have no running tasks at this point. + * } + * + * } + * </pre> + * <p> + * <br> + * Notes to implementors: * <ul> * <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered * singleton in this container, and this container has the lease for the ID with which the singleton - * was registered. See {@link #registerSingleton} and {@link #isActive}.</li> + * was registered. See {@link #id}, {@link #register} and {@link #isActive}.</li> * <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever - * the above no longer holds. See {@link #unregisterSingleton}.</li> - * <li>Callbacks for the same ID are always invoked by the same thread, in serial; - * the callbacks must return in a timely manner, but are allowed to throw exceptions.</li> + * the above no longer holds. See {@link #unregister}.</li> + * <li>Callbacks for the same ID are always invoked by the same thread, in serial; the callbacks must + * return in a timely manner, but are encouraged to throw exceptions when something's wrong</li> * <li>Activation and deactivation may be triggered by: - * <ol><li>the container acquiring or losing the activation lease; or</li> - * <li>registration of unregistration of a new or obsolete singleton.</li></ol> + * <ol> + * <li>the container acquiring or losing the activation lease; or</li> + * <li>registration of unregistration of a new or obsolete singleton.</li> + * </ol> * Events triggered by the latter happen synchronously, and errors are propagated to the caller for cleanup. * Events triggered by the former may happen in the background, and because the system tries to always have * one activated singleton, exceptions during activation will cause the container to abandon its lease, so @@ -104,7 +175,6 @@ public interface VespaCurator { * </li> * <li>A container without any registered singletons will not attempt to hold the activation lease.</li> * </ul> - * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations. */ interface SingletonWorker { @@ -118,6 +188,14 @@ public interface VespaCurator { /** Called by the system whenever this singleton is no longer the single active worker. */ void deactivate(); + /** + * The singleton ID to use when registering this with a {@link VespaCurator}. + * At most one singleton worker with the given ID will be active, in the cluster, at any time. + * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently + * allowed to have an active singleton with the given ID. + */ + default String id() { return getClass().getName(); } + } } diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java index 1a5a9413774..d0fef86946c 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java @@ -3,9 +3,9 @@ package com.yahoo.vespa.curator; import com.yahoo.jdisc.test.MockMetric; import com.yahoo.path.Path; import com.yahoo.test.ManualClock; -import com.yahoo.vespa.curator.api.AbstractSingletonWorker; import com.yahoo.vespa.curator.api.VespaCurator; import com.yahoo.vespa.curator.api.VespaCurator.Meta; +import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; import com.yahoo.vespa.curator.mock.MockCurator; import com.yahoo.vespa.curator.mock.MockCuratorFramework; import org.apache.curator.framework.state.ConnectionState; @@ -61,13 +61,8 @@ public class CuratorWrapperTest { assertEquals(List.of(), curator.list(Path.createRoot())); try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) { - assertEquals(List.of("user", "path"), wrapped.getChildren(Path.createRoot())); assertEquals(List.of("path"), wrapped.getChildren(CuratorWrapper.userRoot)); } - - try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) { - // Both previous locks were released. - } } } @@ -319,8 +314,13 @@ public class CuratorWrapperTest { } } - static class Singleton extends AbstractSingletonWorker { - Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); } + static class Singleton implements SingletonWorker { + final VespaCurator curator; + Singleton(VespaCurator curator) { + this.curator = curator; + + curator.register(this, Duration.ofSeconds(2)); + } boolean isActive; Phaser phaser = new Phaser(1); @Override public String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ) @@ -334,7 +334,7 @@ public class CuratorWrapperTest { isActive = false; phaser.arriveAndAwaitAdvance(); } - public void shutdown() { unregister(Duration.ofSeconds(2)); } + public void shutdown() { curator.unregister(this, Duration.ofSeconds(2)); } } static void verifyMetrics(Map<String, Double> expected, MockMetric metrics) { |