aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåkon Hallingstad <hakon.hallingstad@gmail.com>2022-10-12 11:16:44 +0200
committerGitHub <noreply@github.com>2022-10-12 11:16:44 +0200
commit1392e1638854131eb91412e31d52e71810be5e98 (patch)
treee9486d9e367c3dbaf2299e15e9bea468d34a801a
parentcdf3aaddb6d301ae78c9865fa4152c298affa526 (diff)
parent18b9e3ac1120513f934263a3ca131d6e9e85e7d6 (diff)
Merge pull request #24400 from vespa-engine/jonmv/singleton-log-and-sample-cod
Jonmv/singleton log and sample cod
-rw-r--r--zkfacade/abi-spec.json24
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java45
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java6
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java95
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java112
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java18
6 files changed, 148 insertions, 152 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json
index 12e165b6cc9..41a1854c276 100644
--- a/zkfacade/abi-spec.json
+++ b/zkfacade/abi-spec.json
@@ -1,21 +1,4 @@
{
- "com.yahoo.vespa.curator.api.AbstractSingletonWorker": {
- "superClass": "java.lang.Object",
- "interfaces": [
- "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker"
- ],
- "attributes": [
- "public",
- "abstract"
- ],
- "methods": [
- "public void <init>()",
- "public java.lang.String id()",
- "public final void register(com.yahoo.vespa.curator.api.VespaCurator, java.time.Duration)",
- "public final void unregister(java.time.Duration)"
- ],
- "fields": []
- },
"com.yahoo.vespa.curator.api.VespaCurator$Data": {
"superClass": "java.lang.Record",
"interfaces": [],
@@ -61,7 +44,8 @@
],
"methods": [
"public abstract void activate()",
- "public abstract void deactivate()"
+ "public abstract void deactivate()",
+ "public java.lang.String id()"
],
"fields": []
},
@@ -83,8 +67,8 @@
"public abstract boolean delete(com.yahoo.path.Path, int)",
"public abstract java.util.List list(com.yahoo.path.Path)",
"public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)",
- "public abstract java.util.concurrent.Future registerSingleton(java.lang.String, com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
- "public abstract java.util.concurrent.Future unregisterSingleton(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract void register(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)",
+ "public abstract void unregister(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)",
"public abstract boolean isActive(java.lang.String)"
],
"fields": []
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
index 2cba6ee6efe..27d969c0c09 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
@@ -2,6 +2,7 @@ package com.yahoo.vespa.curator;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
+import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.jdisc.Metric;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.api.VespaCurator;
@@ -16,7 +17,9 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Implementation of {@link VespaCurator} which delegates to a {@link Curator}.
@@ -101,21 +104,45 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator {
@Override
public AutoCloseable lock(Path path, Duration timeout) {
- // TODO jonmv: clear up
- Lock current, old = curator.lock(path, timeout);
- try { current = curator.lock(userRoot.append(path), timeout); }
- catch (Throwable t) { old.close(); throw t; }
- return () -> { try(old) { current.close(); } };
+ return curator.lock(userRoot.append(path), timeout);
}
@Override
- public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) {
- return singletons.register(singletonId, singleton);
+ public void register(SingletonWorker singleton, Duration timeout) {
+ try {
+ await(singletons.register(singleton.id(), singleton), timeout, "register " + singleton);
+ }
+ catch (RuntimeException e) {
+ try {
+ unregister(singleton, timeout);
+ }
+ catch (Exception f) {
+ e.addSuppressed(f);
+ }
+ throw e;
+ }
}
@Override
- public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) {
- return singletons.unregister(singleton);
+ public void unregister(SingletonWorker singleton, Duration timeout) {
+ await(singletons.unregister(singleton), timeout, "unregister " + singleton);
+ }
+
+ private void await(Future<?> future, Duration timeout, String action) {
+ try {
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ future.cancel(true);
+ throw new UncheckedInterruptedException("interrupted while " + action, e, true);
+ }
+ catch (TimeoutException e) {
+ future.cancel(true);
+ throw new UncheckedTimeoutException("timed out while " + action, e);
+ }
+ catch (ExecutionException e) {
+ throw new RuntimeException("failed to " + action, e.getCause());
+ }
}
@Override
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
index 3186c56b002..d2fe6ac6625 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
@@ -140,7 +140,7 @@ class SingletonManager {
Janitor(String id) {
this.id = id;
this.path = Path.fromString("/vespa/singleton/v1/" + id);
- this.worker = new Thread(this::run, "singleton-janitor-" + id + "-");
+ this.worker = new Thread(this::run, "singleton-janitor-" + id);
this.metrics = new MetricHelper();
worker.setDaemon(true);
@@ -199,7 +199,7 @@ class SingletonManager {
}
}
catch (RuntimeException e) {
- logger.log(Level.WARNING, "Uncaught exception in " + worker, e);
+ logger.log(Level.WARNING, "Exception attempting to " + task.type + " " + task.singleton + " in " + worker, e);
task.future.completeExceptionally(e);
}
}
@@ -374,6 +374,7 @@ class SingletonManager {
Instant start = clock.instant();
boolean failed = false;
metric.add(ACTIVATION, 1, context);
+ logger.log(Level.INFO, "Activating singleton with ID " + id);
try {
activation.run();
}
@@ -392,6 +393,7 @@ class SingletonManager {
Instant start = clock.instant();
boolean failed = false;
metric.add(DEACTIVATION, 1, context);
+ logger.log(Level.INFO, "Deactivating singleton with ID " + id);
try {
deactivation.run();
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
deleted file mode 100644
index dc0540e73c5..00000000000
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.yahoo.vespa.curator.api;
-
-import com.yahoo.component.AbstractComponent;
-import com.yahoo.concurrent.UncheckedTimeoutException;
-import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
-import com.yahoo.yolean.UncheckedInterruptedException;
-
-import java.time.Duration;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * Super-class for implementations of {@link SingletonWorker}.
- * Users should call {@link VespaCurator#registerSingleton} at construction, and
- * {@link VespaCurator#unregisterSingleton} at deconstruction.
- * If this fails activation on registration, it will immediately unregister as well, before propagating the error.
- * Consequently, registering this on construction will allow detecting a failed component generation, and instead
- * retain the previous generation, provided enough verification is done in {@link #activate()}.
- * The default ID to use with registration is the concrete class name, e.g., {@code my.example.Singleton}.
- *
- * @author jonmv
- */
-public abstract class AbstractSingletonWorker implements SingletonWorker {
-
- private final AtomicReference<VespaCurator> owner = new AtomicReference<>();
-
- /**
- * The singleton ID to use when registering this with a {@link VespaCurator}.
- * At most one singleton worker with the given ID will be active, in the cluster, at any time.
- * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently
- * allowed to have an active singleton with the given ID.
- */
- public String id() { return getClass().getName(); }
-
- /**
- * <strong>Call this at the end of construction of the owner of this.</strong>
- * If this activates the singleton, this happens synchronously, and any errors are propagated here.
- * If this replaces an already active singleton, its deactivation is also called, prior to activation of this.
- * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
- * If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.
- */
- public final void register(VespaCurator curator, Duration timeout) {
- if ( ! owner.compareAndSet(null, curator)) {
- throw new IllegalArgumentException(this + " is already registered with " + owner.get());
- }
- try {
- await(curator.registerSingleton(id(), this), timeout, "register");
- }
- catch (RuntimeException e) {
- try {
- unregister(timeout);
- }
- catch (Exception f) {
- e.addSuppressed(f);
- }
- throw e;
- }
- }
-
- /**
- * <strong>Call this at the start of deconstruction of the owner of this!</strong>
- * <p>
- * If this singleton is active, deactivation will be called synchronously, and errors propagated here.
- * If this also triggers activation of a new singleton, its activation is called after deactivation of this.
- * If (de)activation is not complete within the given timeout, a timeout exception is thrown.
- */
- public final void unregister(Duration timeout) {
- VespaCurator curator = owner.getAndSet(null);
- if (curator == null) {
- throw new IllegalArgumentException(this + " was not registered with any owners");
- }
- await(curator.unregisterSingleton(this), timeout, "unregister");
- }
-
- private void await(Future<?> future, Duration timeout, String verb) {
- try {
- future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e) {
- future.cancel(true);
- throw new UncheckedInterruptedException("interrupted while " + verb + "ing " + this, e, true);
- }
- catch (TimeoutException e) {
- future.cancel(true);
- throw new UncheckedTimeoutException("timed out while " + verb + "ing " + this, e);
- }
- catch (ExecutionException e) {
- throw new RuntimeException("failed to " + verb + " " + this, e.getCause());
- }
- }
-
-}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
index 6ac44631ed2..f2bc38a4644 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
@@ -7,6 +7,7 @@ import com.yahoo.path.Path;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -63,20 +64,29 @@ public interface VespaCurator {
record Meta(int version) { }
/**
- * Register the singleton with the framework, so it may become active, and returns a
- * synchronisation handle to any deactivations or activations triggered by this.
- * If there is already another active singleton with the given ID (in this JVM),
- * that will be deactivated before the new one is activated.
+ * Register the singleton with the framework, so it may become active.
+ * <p>
+ * <strong>Call this after construction of the singleton, typically during component construction!</strong>
+ * <ul>
+ * <li>If this activates the singleton, this happens synchronously, and any errors are propagated here.</li>
+ * <li>If this replaces an already active singleton, its deactivation is also called, prior to activation of this.</li>
+ * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li>
+ * <li>If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.</li>
+ * </ul>
*/
- Future<?> registerSingleton(String singletonId, SingletonWorker singleton);
+ void register(SingletonWorker singleton, Duration timeout);
/**
* Unregister with the framework, so this singleton may no longer be active, and returns
- * a synchronisation handle to any deactivation or activation triggered by this.
- * If this is the last singleton registered with its ID, then this container immediately releases
- * the activation lease for that ID, so another container may acquire it.
+ * <p>
+ * <strong>Call this before deconstruction of the singleton, typically during component deconstruction!</strong>
+ * <ul>
+ * <li>If this singleton is active, deactivation will be called synchronously, and errors propagated here.</li>
+ * <li>If this also triggers activation of a new singleton, its activation is called after deactivation of this.</li>
+ * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li>
+ * </ul>
*/
- Future<?> unregisterSingleton(SingletonWorker singleton);
+ void unregister(SingletonWorker singleton, Duration timeout);
/**
* Whether this container currently holds the exclusive lease for activation of singletons with this ID.
@@ -85,18 +95,79 @@ public interface VespaCurator {
/**
* Callback interface for processes of which only a single instance should be active at any time, across all
- * containers in the cluster, and across all component generations. Notes:
+ * containers in the cluster, and across all component generations.
+ * <p>
+ * <br>
+ * Sample usage:
+ * <pre>
+ * public class SingletonHolder extends AbstractComponent {
+ *
+ * private static final Duration timeout = Duration.ofSeconds(10);
+ * private final VespaCurator curator;
+ * private final SingletonWorker singleton;
+ *
+ * public SingletonHolder(VespaCurator curator) {
+ * this.curator = curator;
+ * this.singleton = new Singleton();
+ * curator.register(singleton, timeout);
+ * }
+ *
+ * &#064;Override
+ * public void deconstruct() {
+ * curator.unregister(singleton, timeout);
+ * singleton.shutdown();
+ * }
+ *
+ * }
+ *
+ * public class Singleton implements SingletonWorker {
+ *
+ * private final SharedResource resource = ...; // Shared resource that requires exclusive access.
+ * private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ * private final AtomicBoolean running = new AtomicBoolean();
+ * private Future&lt;?&gt; future = null;
+ *
+ * &#064;Override
+ * public void activate() {
+ * resource.open(); // Verify resource works here, and propagate any errors out.
+ * running.set(true);
+ * future = executor.submit(this::doWork);
+ * }
+ *
+ * &#064;Override
+ * public void deactivate() {
+ * running.set(false);
+ * try { future.get(10, TimeUnit.SECONDS); }
+ * catch (Exception e) { ... }
+ * finally { resource.close(); }
+ * }
+ *
+ * private void doWork() {
+ * while (running.get()) { ... } // Regularly check whether we should keep running.
+ * }
+ *
+ * public void shutdown() {
+ * executor.shutdownNow(); // Executor should have no running tasks at this point.
+ * }
+ *
+ * }
+ * </pre>
+ * <p>
+ * <br>
+ * Notes to implementors:
* <ul>
* <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered
* singleton in this container, and this container has the lease for the ID with which the singleton
- * was registered. See {@link #registerSingleton} and {@link #isActive}.</li>
+ * was registered. See {@link #id}, {@link #register} and {@link #isActive}.</li>
* <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever
- * the above no longer holds. See {@link #unregisterSingleton}.</li>
- * <li>Callbacks for the same ID are always invoked by the same thread, in serial;
- * the callbacks must return in a timely manner, but are allowed to throw exceptions.</li>
+ * the above no longer holds. See {@link #unregister}.</li>
+ * <li>Callbacks for the same ID are always invoked by the same thread, in serial; the callbacks must
+ * return in a timely manner, but are encouraged to throw exceptions when something's wrong</li>
* <li>Activation and deactivation may be triggered by:
- * <ol><li>the container acquiring or losing the activation lease; or</li>
- * <li>registration of unregistration of a new or obsolete singleton.</li></ol>
+ * <ol>
+ * <li>the container acquiring or losing the activation lease; or</li>
+ * <li>registration of unregistration of a new or obsolete singleton.</li>
+ * </ol>
* Events triggered by the latter happen synchronously, and errors are propagated to the caller for cleanup.
* Events triggered by the former may happen in the background, and because the system tries to always have
* one activated singleton, exceptions during activation will cause the container to abandon its lease, so
@@ -104,7 +175,6 @@ public interface VespaCurator {
* </li>
* <li>A container without any registered singletons will not attempt to hold the activation lease.</li>
* </ul>
- * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations.
*/
interface SingletonWorker {
@@ -118,6 +188,14 @@ public interface VespaCurator {
/** Called by the system whenever this singleton is no longer the single active worker. */
void deactivate();
+ /**
+ * The singleton ID to use when registering this with a {@link VespaCurator}.
+ * At most one singleton worker with the given ID will be active, in the cluster, at any time.
+ * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently
+ * allowed to have an active singleton with the given ID.
+ */
+ default String id() { return getClass().getName(); }
+
}
}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
index 1a5a9413774..d0fef86946c 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
@@ -3,9 +3,9 @@ package com.yahoo.vespa.curator;
import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.path.Path;
import com.yahoo.test.ManualClock;
-import com.yahoo.vespa.curator.api.AbstractSingletonWorker;
import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.api.VespaCurator.Meta;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
import com.yahoo.vespa.curator.mock.MockCurator;
import com.yahoo.vespa.curator.mock.MockCuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
@@ -61,13 +61,8 @@ public class CuratorWrapperTest {
assertEquals(List.of(), curator.list(Path.createRoot()));
try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
- assertEquals(List.of("user", "path"), wrapped.getChildren(Path.createRoot()));
assertEquals(List.of("path"), wrapped.getChildren(CuratorWrapper.userRoot));
}
-
- try (AutoCloseable lock = curator.lock(path, Duration.ofSeconds(1))) {
- // Both previous locks were released.
- }
}
}
@@ -319,8 +314,13 @@ public class CuratorWrapperTest {
}
}
- static class Singleton extends AbstractSingletonWorker {
- Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); }
+ static class Singleton implements SingletonWorker {
+ final VespaCurator curator;
+ Singleton(VespaCurator curator) {
+ this.curator = curator;
+
+ curator.register(this, Duration.ofSeconds(2));
+ }
boolean isActive;
Phaser phaser = new Phaser(1);
@Override public String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ)
@@ -334,7 +334,7 @@ public class CuratorWrapperTest {
isActive = false;
phaser.arriveAndAwaitAdvance();
}
- public void shutdown() { unregister(Duration.ofSeconds(2)); }
+ public void shutdown() { curator.unregister(this, Duration.ofSeconds(2)); }
}
static void verifyMetrics(Map<String, Double> expected, MockMetric metrics) {