summaryrefslogtreecommitdiffstats
path: root/zkfacade
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-10-12 10:51:21 +0200
committerjonmv <venstad@gmail.com>2022-10-12 10:51:21 +0200
commit18b9e3ac1120513f934263a3ca131d6e9e85e7d6 (patch)
tree0e3b35a095194ae8d02f295f117ec1aa67f2f3a2 /zkfacade
parent1c6192ff71f635a25b069c7761948105f1ae6dae (diff)
Kill AbstractSingletonWorker, and add sample code
Diffstat (limited to 'zkfacade')
-rw-r--r--zkfacade/abi-spec.json24
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java39
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java6
-rw-r--r--zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java112
-rw-r--r--zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java13
5 files changed, 147 insertions, 47 deletions
diff --git a/zkfacade/abi-spec.json b/zkfacade/abi-spec.json
index 12e165b6cc9..41a1854c276 100644
--- a/zkfacade/abi-spec.json
+++ b/zkfacade/abi-spec.json
@@ -1,21 +1,4 @@
{
- "com.yahoo.vespa.curator.api.AbstractSingletonWorker": {
- "superClass": "java.lang.Object",
- "interfaces": [
- "com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker"
- ],
- "attributes": [
- "public",
- "abstract"
- ],
- "methods": [
- "public void <init>()",
- "public java.lang.String id()",
- "public final void register(com.yahoo.vespa.curator.api.VespaCurator, java.time.Duration)",
- "public final void unregister(java.time.Duration)"
- ],
- "fields": []
- },
"com.yahoo.vespa.curator.api.VespaCurator$Data": {
"superClass": "java.lang.Record",
"interfaces": [],
@@ -61,7 +44,8 @@
],
"methods": [
"public abstract void activate()",
- "public abstract void deactivate()"
+ "public abstract void deactivate()",
+ "public java.lang.String id()"
],
"fields": []
},
@@ -83,8 +67,8 @@
"public abstract boolean delete(com.yahoo.path.Path, int)",
"public abstract java.util.List list(com.yahoo.path.Path)",
"public abstract java.lang.AutoCloseable lock(com.yahoo.path.Path, java.time.Duration)",
- "public abstract java.util.concurrent.Future registerSingleton(java.lang.String, com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
- "public abstract java.util.concurrent.Future unregisterSingleton(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker)",
+ "public abstract void register(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)",
+ "public abstract void unregister(com.yahoo.vespa.curator.api.VespaCurator$SingletonWorker, java.time.Duration)",
"public abstract boolean isActive(java.lang.String)"
],
"fields": []
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
index 832755de6eb..27d969c0c09 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java
@@ -2,6 +2,7 @@ package com.yahoo.vespa.curator;
import com.yahoo.component.AbstractComponent;
import com.yahoo.component.annotation.Inject;
+import com.yahoo.concurrent.UncheckedTimeoutException;
import com.yahoo.jdisc.Metric;
import com.yahoo.path.Path;
import com.yahoo.vespa.curator.api.VespaCurator;
@@ -16,7 +17,9 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
/**
* Implementation of {@link VespaCurator} which delegates to a {@link Curator}.
@@ -105,13 +108,41 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator {
}
@Override
- public CompletableFuture<?> registerSingleton(String singletonId, SingletonWorker singleton) {
- return singletons.register(singletonId, singleton);
+ public void register(SingletonWorker singleton, Duration timeout) {
+ try {
+ await(singletons.register(singleton.id(), singleton), timeout, "register " + singleton);
+ }
+ catch (RuntimeException e) {
+ try {
+ unregister(singleton, timeout);
+ }
+ catch (Exception f) {
+ e.addSuppressed(f);
+ }
+ throw e;
+ }
}
@Override
- public CompletableFuture<?> unregisterSingleton(SingletonWorker singleton) {
- return singletons.unregister(singleton);
+ public void unregister(SingletonWorker singleton, Duration timeout) {
+ await(singletons.unregister(singleton), timeout, "unregister " + singleton);
+ }
+
+ private void await(Future<?> future, Duration timeout, String action) {
+ try {
+ future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ future.cancel(true);
+ throw new UncheckedInterruptedException("interrupted while " + action, e, true);
+ }
+ catch (TimeoutException e) {
+ future.cancel(true);
+ throw new UncheckedTimeoutException("timed out while " + action, e);
+ }
+ catch (ExecutionException e) {
+ throw new RuntimeException("failed to " + action, e.getCause());
+ }
}
@Override
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
index 3186c56b002..d2fe6ac6625 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java
@@ -140,7 +140,7 @@ class SingletonManager {
Janitor(String id) {
this.id = id;
this.path = Path.fromString("/vespa/singleton/v1/" + id);
- this.worker = new Thread(this::run, "singleton-janitor-" + id + "-");
+ this.worker = new Thread(this::run, "singleton-janitor-" + id);
this.metrics = new MetricHelper();
worker.setDaemon(true);
@@ -199,7 +199,7 @@ class SingletonManager {
}
}
catch (RuntimeException e) {
- logger.log(Level.WARNING, "Uncaught exception in " + worker, e);
+ logger.log(Level.WARNING, "Exception attempting to " + task.type + " " + task.singleton + " in " + worker, e);
task.future.completeExceptionally(e);
}
}
@@ -374,6 +374,7 @@ class SingletonManager {
Instant start = clock.instant();
boolean failed = false;
metric.add(ACTIVATION, 1, context);
+ logger.log(Level.INFO, "Activating singleton with ID " + id);
try {
activation.run();
}
@@ -392,6 +393,7 @@ class SingletonManager {
Instant start = clock.instant();
boolean failed = false;
metric.add(DEACTIVATION, 1, context);
+ logger.log(Level.INFO, "Deactivating singleton with ID " + id);
try {
deactivation.run();
}
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
index 6ac44631ed2..f2bc38a4644 100644
--- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
+++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java
@@ -7,6 +7,7 @@ import com.yahoo.path.Path;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -63,20 +64,29 @@ public interface VespaCurator {
record Meta(int version) { }
/**
- * Register the singleton with the framework, so it may become active, and returns a
- * synchronisation handle to any deactivations or activations triggered by this.
- * If there is already another active singleton with the given ID (in this JVM),
- * that will be deactivated before the new one is activated.
+ * Register the singleton with the framework, so it may become active.
+ * <p>
+ * <strong>Call this after construction of the singleton, typically during component construction!</strong>
+ * <ul>
+ * <li>If this activates the singleton, this happens synchronously, and any errors are propagated here.</li>
+ * <li>If this replaces an already active singleton, its deactivation is also called, prior to activation of this.</li>
+ * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li>
+ * <li>If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout.</li>
+ * </ul>
*/
- Future<?> registerSingleton(String singletonId, SingletonWorker singleton);
+ void register(SingletonWorker singleton, Duration timeout);
/**
* Unregister with the framework, so this singleton may no longer be active, and returns
- * a synchronisation handle to any deactivation or activation triggered by this.
- * If this is the last singleton registered with its ID, then this container immediately releases
- * the activation lease for that ID, so another container may acquire it.
+ * <p>
+ * <strong>Call this before deconstruction of the singleton, typically during component deconstruction!</strong>
+ * <ul>
+ * <li>If this singleton is active, deactivation will be called synchronously, and errors propagated here.</li>
+ * <li>If this also triggers activation of a new singleton, its activation is called after deactivation of this.</li>
+ * <li>If (de)activation is not complete within the given timeout, a timeout exception is thrown.</li>
+ * </ul>
*/
- Future<?> unregisterSingleton(SingletonWorker singleton);
+ void unregister(SingletonWorker singleton, Duration timeout);
/**
* Whether this container currently holds the exclusive lease for activation of singletons with this ID.
@@ -85,18 +95,79 @@ public interface VespaCurator {
/**
* Callback interface for processes of which only a single instance should be active at any time, across all
- * containers in the cluster, and across all component generations. Notes:
+ * containers in the cluster, and across all component generations.
+ * <p>
+ * <br>
+ * Sample usage:
+ * <pre>
+ * public class SingletonHolder extends AbstractComponent {
+ *
+ * private static final Duration timeout = Duration.ofSeconds(10);
+ * private final VespaCurator curator;
+ * private final SingletonWorker singleton;
+ *
+ * public SingletonHolder(VespaCurator curator) {
+ * this.curator = curator;
+ * this.singleton = new Singleton();
+ * curator.register(singleton, timeout);
+ * }
+ *
+ * &#064;Override
+ * public void deconstruct() {
+ * curator.unregister(singleton, timeout);
+ * singleton.shutdown();
+ * }
+ *
+ * }
+ *
+ * public class Singleton implements SingletonWorker {
+ *
+ * private final SharedResource resource = ...; // Shared resource that requires exclusive access.
+ * private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ * private final AtomicBoolean running = new AtomicBoolean();
+ * private Future&lt;?&gt; future = null;
+ *
+ * &#064;Override
+ * public void activate() {
+ * resource.open(); // Verify resource works here, and propagate any errors out.
+ * running.set(true);
+ * future = executor.submit(this::doWork);
+ * }
+ *
+ * &#064;Override
+ * public void deactivate() {
+ * running.set(false);
+ * try { future.get(10, TimeUnit.SECONDS); }
+ * catch (Exception e) { ... }
+ * finally { resource.close(); }
+ * }
+ *
+ * private void doWork() {
+ * while (running.get()) { ... } // Regularly check whether we should keep running.
+ * }
+ *
+ * public void shutdown() {
+ * executor.shutdownNow(); // Executor should have no running tasks at this point.
+ * }
+ *
+ * }
+ * </pre>
+ * <p>
+ * <br>
+ * Notes to implementors:
* <ul>
* <li>{@link #activate()} is called by the system on a singleton whenever it is the newest registered
* singleton in this container, and this container has the lease for the ID with which the singleton
- * was registered. See {@link #registerSingleton} and {@link #isActive}.</li>
+ * was registered. See {@link #id}, {@link #register} and {@link #isActive}.</li>
* <li>{@link #deactivate()} is called by the system on a singleton which is currently active whenever
- * the above no longer holds. See {@link #unregisterSingleton}.</li>
- * <li>Callbacks for the same ID are always invoked by the same thread, in serial;
- * the callbacks must return in a timely manner, but are allowed to throw exceptions.</li>
+ * the above no longer holds. See {@link #unregister}.</li>
+ * <li>Callbacks for the same ID are always invoked by the same thread, in serial; the callbacks must
+ * return in a timely manner, but are encouraged to throw exceptions when something's wrong</li>
* <li>Activation and deactivation may be triggered by:
- * <ol><li>the container acquiring or losing the activation lease; or</li>
- * <li>registration of unregistration of a new or obsolete singleton.</li></ol>
+ * <ol>
+ * <li>the container acquiring or losing the activation lease; or</li>
+ * <li>registration of unregistration of a new or obsolete singleton.</li>
+ * </ol>
* Events triggered by the latter happen synchronously, and errors are propagated to the caller for cleanup.
* Events triggered by the former may happen in the background, and because the system tries to always have
* one activated singleton, exceptions during activation will cause the container to abandon its lease, so
@@ -104,7 +175,6 @@ public interface VespaCurator {
* </li>
* <li>A container without any registered singletons will not attempt to hold the activation lease.</li>
* </ul>
- * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations.
*/
interface SingletonWorker {
@@ -118,6 +188,14 @@ public interface VespaCurator {
/** Called by the system whenever this singleton is no longer the single active worker. */
void deactivate();
+ /**
+ * The singleton ID to use when registering this with a {@link VespaCurator}.
+ * At most one singleton worker with the given ID will be active, in the cluster, at any time.
+ * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently
+ * allowed to have an active singleton with the given ID.
+ */
+ default String id() { return getClass().getName(); }
+
}
}
diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
index 030104b82f0..d0fef86946c 100644
--- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
+++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java
@@ -3,9 +3,9 @@ package com.yahoo.vespa.curator;
import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.path.Path;
import com.yahoo.test.ManualClock;
-import com.yahoo.vespa.curator.api.AbstractSingletonWorker;
import com.yahoo.vespa.curator.api.VespaCurator;
import com.yahoo.vespa.curator.api.VespaCurator.Meta;
+import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker;
import com.yahoo.vespa.curator.mock.MockCurator;
import com.yahoo.vespa.curator.mock.MockCuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
@@ -314,8 +314,13 @@ public class CuratorWrapperTest {
}
}
- static class Singleton extends AbstractSingletonWorker {
- Singleton(VespaCurator curator) { register(curator, Duration.ofSeconds(2)); }
+ static class Singleton implements SingletonWorker {
+ final VespaCurator curator;
+ Singleton(VespaCurator curator) {
+ this.curator = curator;
+
+ curator.register(this, Duration.ofSeconds(2));
+ }
boolean isActive;
Phaser phaser = new Phaser(1);
@Override public String id() { return "singleton"; } // ... lest anonymous subclasses get different IDs ... ƪ(`▿▿▿▿´ƪ)
@@ -329,7 +334,7 @@ public class CuratorWrapperTest {
isActive = false;
phaser.arriveAndAwaitAdvance();
}
- public void shutdown() { unregister(Duration.ofSeconds(2)); }
+ public void shutdown() { curator.unregister(this, Duration.ofSeconds(2)); }
}
static void verifyMetrics(Map<String, Double> expected, MockMetric metrics) {