From 5737387f8c0dd414df16933eb75b944c25d1bb8d Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 7 Oct 2022 10:48:51 +0200 Subject: Expand ZK API, and implement in CuratorWrapper --- .../main/java/com/yahoo/vespa/curator/Curator.java | 2 +- .../com/yahoo/vespa/curator/CuratorWrapper.java | 79 +++++ .../com/yahoo/vespa/curator/SingletonManager.java | 331 +++++++++++++++++++++ .../vespa/curator/api/AbstractSingletonWorker.java | 94 ++++++ .../com/yahoo/vespa/curator/api/VespaCurator.java | 91 +++++- 5 files changed, 595 insertions(+), 2 deletions(-) create mode 100644 zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java create mode 100644 zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java (limited to 'zkfacade/src/main/java/com/yahoo') diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java index 134450edd9a..6c0084a698e 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/Curator.java @@ -64,7 +64,7 @@ public class Curator extends AbstractComponent implements AutoCloseable { private static final File ZK_CLIENT_CONFIG_FILE = new File(Defaults.getDefaults().underVespaHome("conf/zookeeper/zookeeper-client.cfg")); // Note that session timeout has min and max values are related to tickTime defined by server, see configserver.def - private static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120); + static final Duration ZK_SESSION_TIMEOUT = Duration.ofSeconds(120); private static final Duration ZK_CONNECTION_TIMEOUT = Duration.ofSeconds(30); private static final Duration BASE_SLEEP_TIME = Duration.ofSeconds(1); diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java index 64c1a023a91..66380cfd5f4 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java @@ -25,10 +25,69 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator { static final Path userRoot = Path.fromString("user"); private final Curator curator; + private final SingletonManager singletons; @Inject public CuratorWrapper(Curator curator) { + this(curator, Clock.systemUTC(), Duration.ofSeconds(1)); + } + + CuratorWrapper(Curator curator, Clock clock, Duration tickTimeout) { this.curator = curator; + this.singletons = new SingletonManager(curator, clock, tickTimeout); + + curator.framework().getConnectionStateListenable().addListener((curatorFramework, connectionState) -> { + if (connectionState == ConnectionState.LOST) singletons.invalidate(); + }); + } + + @Override + public Optional stat(Path path) { + return curator.getStat(userRoot.append(path)).map(stat -> new Meta(stat.getVersion())); + } + + @Override + public Optional read(Path path) { + Stat stat = new Stat(); + return curator.getData(userRoot.append(path), stat).map(data -> new Data(new Meta(stat.getVersion()), data)); + } + + @Override + public Meta write(Path path, byte[] data) { + return new Meta(curator.set(userRoot.append(path), data).getVersion()); + } + + @Override + public Optional write(Path path, byte[] data, int expectedVersion) { + try { + return Optional.of(new Meta(curator.set(userRoot.append(path), data, expectedVersion).getVersion())); + } + catch (RuntimeException e) { + if (e.getCause() instanceof BadVersionException) return Optional.empty(); + throw e; + } + } + + @Override + public void delete(Path path) { + curator.delete(userRoot.append(path)); + } + + @Override + public boolean delete(Path path, int expectedVersion) { + try { + curator.delete(userRoot.append(path), expectedVersion); + return true; + } + catch (RuntimeException e) { + if (e.getCause() instanceof BadVersionException) return false; + throw e; + } + } + + @Override + public List list(Path path) { + return curator.getChildren(userRoot.append(path)); } @Override @@ -40,4 +99,24 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator { return () -> { try(old) { current.close(); } }; } + @Override + public CompletableFuture registerSingleton(String singletonId, SingletonWorker singleton) { + return singletons.register(singletonId, singleton); + } + + @Override + public CompletableFuture unregisterSingleton(SingletonWorker singleton) { + return singletons.unregister(singleton); + } + + @Override + public boolean isActive(String singletonId) { + return singletons.isActive(singletonId); + } + + @Override + public void deconstruct() { + singletons.close(); + } + } diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java new file mode 100644 index 00000000000..72f0fecb695 --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java @@ -0,0 +1,331 @@ +package com.yahoo.vespa.curator; + +import com.yahoo.path.Path; +import com.yahoo.protect.Process; +import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Manages {@link com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker}. + * + * @author jonmv + */ +class SingletonManager implements AutoCloseable { + + private static final Logger logger = Logger.getLogger(SingletonManager.class.getName()); + + private final Curator curator; + private final Clock clock; + private final Duration tickTimeout; + private final Map janitors = new HashMap<>(); + private final Map count = new HashMap<>(); + private final Map registrations = new HashMap<>(); + + SingletonManager(Curator curator, Clock clock, Duration tickTimeout) { + this.curator = curator; + this.clock = clock; + this.tickTimeout = tickTimeout; + } + + synchronized CompletableFuture register(String singletonId, SingletonWorker singleton) { + if (singletonId.contains("/") || singletonId.contains("..")) { + throw new IllegalArgumentException("singleton ID may not contain '/' or '..', but got " + singletonId); + } + String old = registrations.putIfAbsent(singleton, singletonId); + if (old != null) throw new IllegalArgumentException(singleton + " already registered with ID " + old); + count.merge(singletonId, 1, Integer::sum); + return janitors.computeIfAbsent(singletonId, Janitor::new).register(singleton); + } + + synchronized CompletableFuture unregister(SingletonWorker singleton) { + String id = registrations.remove(singleton); + if (id == null) throw new IllegalArgumentException(singleton + " is not registered"); + return janitors.get(id).unregister(singleton) + .whenComplete((__, ___) -> unregistered(id, singleton)); + } + + synchronized void unregistered(String singletonId, SingletonWorker singleton) { + registrations.remove(singleton); + if (count.merge(singletonId, -1, Integer::sum) > 0) return; + count.remove(singletonId); + janitors.remove(singletonId).shutdown(); + } + + /** + * The instant until which this container holds the exclusive lease for activation of singletons with this ID. + * The container may abandon the lease early, if deactivation is triggered and completes before the deadline. + * Unless connection to the underlying ZK cluster is lost, the returned value will regularly move forwards in time. + */ + Optional activeUntil(String singletonId) { + return Optional.ofNullable(janitors.get(singletonId)).map(janitor -> janitor.doom.get()); + } + + /** Whether this container currently holds the activation lease for the given singleton ID. */ + boolean isActive(String singletonId) { + return activeUntil(singletonId).map(clock.instant()::isBefore).orElse(false); + } + + /** Invalidate all leases, due to connection loss. */ + synchronized void invalidate() { + for (Janitor janitor : janitors.values()) janitor.invalidate(); + } + + @Override + public synchronized void close() { + List registered = List.copyOf(registrations.keySet()); + for (SingletonWorker singleton : registered) { + String id = registrations.get(singleton); + logger.log(Level.WARNING, singleton + " still registered with id '" + id + "' at shutdown"); + unregister(singleton); + } + } + + + private class Janitor { + + static class Task { + + enum Type { register, unregister } + + final Type type; + final SingletonWorker singleton; + final CompletableFuture future = new CompletableFuture<>(); + + private Task(Type type, SingletonWorker singleton) { + this.type = type; + this.singleton = singleton; + } + + static Task register(SingletonWorker singleton) { return new Task(Type.register, singleton); } + static Task unregister(SingletonWorker singleton) { return new Task(Type.unregister, singleton); } + + } + + private static final Instant INVALID = Instant.ofEpochMilli(-1); + + final BlockingDeque tasks = new LinkedBlockingDeque<>(); + final Deque singletons = new ArrayDeque<>(2); + final AtomicReference doom = new AtomicReference<>(); + final AtomicBoolean shutdown = new AtomicBoolean(); + final Thread worker; + final String id; + final Path path; + Lock lock = null; + boolean active; + + + Janitor(String id) { + this.id = id; + this.path = Path.fromString("/vespa/singleton/v1/" + id); + this.worker = new Thread(this::run, "singleton-janitor-" + id + "-"); + + worker.setDaemon(true); + worker.start(); + } + + public void unlock() { + doom.set(null); + if (lock != null) try { + lock.close(); + lock = null; + } + catch (Exception e) { + logger.log(Level.WARNING, "failed closing " + lock, e); + } + } + + @SuppressWarnings("fallthrough") + private void run() { + try { + while ( ! shutdown.get()) { + try { + // Attempt to acquire the lock, and extend our doom. + renewLease(); + // Ensure activation status is set accordingly to doom, or clear state if this fails. + updateStatus(); + // Process the next pending, externally triggered task, if any. + doTask(); + } + catch (InterruptedException e) { + if ( ! shutdown.get()) { + logger.log(Level.WARNING, worker + " interrupted, restarting event loop"); + } + } + } + for (Task task : tasks) task.future.cancel(true); + unlock(); + } + catch (Throwable t) { + Process.logAndDie(worker + " can't continue, shutting down", t); + } + } + + protected void doTask() throws InterruptedException { + Task task = tasks.poll(tickTimeout.toMillis(), TimeUnit.MILLISECONDS); + try { + if (task != null) switch (task.type) { + case register -> { + doRegister(task.singleton); + task.future.complete(null); + } + case unregister -> { + doUnregister(task.singleton); + task.future.complete(null); + } + default -> throw new AssertionError("unknown task type '" + task.type + "'"); + } + } + catch (RuntimeException e) { + logger.log(Level.WARNING, "Uncaught exception in " + worker, e); + task.future.completeExceptionally(e); + } + } + + private void doRegister(SingletonWorker singleton) { + SingletonWorker current = singletons.peek(); + singletons.push(singleton); + if (active) { + RuntimeException e = null; + if (current != null) try { + current.deactivate(); + } + catch (RuntimeException f) { + e = f; + } + try { + singleton.activate(); + } + catch (RuntimeException f) { + if (e == null) e = f; + else e.addSuppressed(f); + } + if (e != null) throw e; + } + } + + private void doUnregister(SingletonWorker singleton) { + RuntimeException e = null; + SingletonWorker current = singletons.peek(); + if ( ! singletons.remove(singleton)) return; + if (active && current == singleton) { + try { + singleton.deactivate(); + } + catch (RuntimeException f) { + e = f; + } + if ( ! singletons.isEmpty()) try { + singletons.peek().activate(); + } + catch (RuntimeException f) { + if (e == null) e = f; + else e.addSuppressed(f); + } + } + if (singletons.isEmpty()) { + unlock(); + } + if (e != null) throw e; + } + + /** + * Attempt to acquire the lock, if not held. + * If lock is held, or acquired, ping the ZK cluster to extend our deadline. + */ + private void renewLease() { + if (doom.get() == INVALID) { + unlock(); + } + // Witness value to detect if invalidation occurs between here and successful ping. + Instant ourDoom = doom.get(); + Instant start = clock.instant(); + if (lock == null) try { + lock = curator.lock(path.append("lock"), tickTimeout); + } + catch (RuntimeException e) { + logger.log(Level.FINE, "Failed acquiring lock for '" + path + "' within " + tickTimeout, e); + return; + } + try { + curator.set(path.append("ping"), new byte[0]); + } + catch (RuntimeException e) { + logger.log(Level.FINE, "Failed pinging ZK cluster", e); + return; + } + if ( ! doom.compareAndSet(ourDoom, start.plus(Curator.ZK_SESSION_TIMEOUT.multipliedBy(9).dividedBy(10)))) { + logger.log(Level.FINE, "Deadline changed, invalidating current lease renewal"); + } + } + + /** + * Attempt to activate or deactivate if status has changed. + * If activation fails, we release the lock, to a different container may acquire it. + */ + private void updateStatus() { + Instant ourDoom = doom.get(); + boolean shouldBeActive = ourDoom != null && ! clock.instant().isAfter(ourDoom); + if ( ! active && shouldBeActive) { + try { + active = true; + if ( ! singletons.isEmpty()) singletons.peek().activate(); + } + catch (RuntimeException e) { + logger.log(Level.WARNING, "Failed to activate " + singletons.peek() + ", deactivating again", e); + shouldBeActive = false; + } + } + if (active && ! shouldBeActive) { + try { + active = false; + if ( ! singletons.isEmpty()) singletons.peek().deactivate(); + } + catch (RuntimeException e) { + logger.log(Level.WARNING, "Failed to deactivate " + singletons.peek(), e); + } + unlock(); + } + } + + CompletableFuture register(SingletonWorker singleton) { + Task task = Task.register(singleton); + tasks.offer(task); + return task.future; + } + + CompletableFuture unregister(SingletonWorker singleton) { + Task task = Task.unregister(singleton); + tasks.offer(task); + return task.future; + } + + void invalidate() { + doom.set(INVALID); + } + + void shutdown() { + if ( ! shutdown.compareAndSet(false, true)) { + logger.log(Level.WARNING, "Shutdown called more than once on " + this); + } + } + + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java new file mode 100644 index 00000000000..be0b01e732b --- /dev/null +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/AbstractSingletonWorker.java @@ -0,0 +1,94 @@ +package com.yahoo.vespa.curator.api; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.concurrent.UncheckedTimeoutException; +import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; +import com.yahoo.yolean.UncheckedInterruptedException; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Super-class for implementations of {@link SingletonWorker}. + * Users should call {@link VespaCurator#registerSingleton} at construction, and + * {@link VespaCurator#unregisterSingleton} at deconstruction. + * If this fails activation on registration, it will immediately unregister as well, before propagating the error. + * Consequently, registering this on construction will allow detecting a failed component generation, and instead + * retain the previous generation, provided enough verification is done in {@link #activate()}. + * The default ID to use with registration is the concrete class name, e.g., {@code my.example.Singleton}. + * + * @author jonmv + */ +public abstract class AbstractSingletonWorker extends AbstractComponent implements SingletonWorker { + + private final AtomicReference owner = new AtomicReference<>(); + + /** + * The singleton ID to use when registering this with a {@link VespaCurator}. + * At most one singleton worker with the given ID will be active, in the cluster, at any time. + * {@link VespaCurator#isActive(String)} may be polled to see whether this container is currently + * allowed to have an active singleton with the given ID. + */ + protected String id() { return getClass().getName(); } + + /** + * Call this at the end of construction of the child or owner. + * If this activates the singleton, this happens synchronously, and any errors are propagated here. + * If this replaces an already active singleton, its deactivation is also called, prior to activation of this. + * If (de)activation is not complete within the given timeout, a timeout exception is thrown. + * If an error occurs (due to failed activation), unregistration is automatically attempted, with the same timeout. + */ + protected final void register(VespaCurator curator, Duration timeout) { + if ( ! owner.compareAndSet(null, curator)) { + throw new IllegalArgumentException(this + " is already registered with " + owner.get()); + } + try { + await(curator.registerSingleton(id(), this), timeout, "register"); + } + catch (RuntimeException e) { + try { + unregister(timeout); + } + catch (Exception f) { + e.addSuppressed(f); + } + throw e; + } + } + + /** + * Call this at the start of deconstruction of the child! + * If this singleton is active, deactivation will be called synchronously, and errors propagated here. + * If this also triggers activation of a new singleton, its activation is called after deactivation of this. + * If (de)activation is not complete within the given timeout, a timeout exception is thrown. + */ + protected final void unregister(Duration timeout) { + VespaCurator curator = owner.getAndSet(null); + if (curator == null) { + throw new IllegalArgumentException(this + " was not registered with any owners"); + } + await(curator.unregisterSingleton(this), timeout, "unregister"); + } + + private void await(Future future, Duration timeout, String verb) { + try { + future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + future.cancel(true); + throw new UncheckedInterruptedException("interrupted while " + verb + "ing " + this, e, true); + } + catch (TimeoutException e) { + future.cancel(true); + throw new UncheckedTimeoutException("timed out while " + verb + "ing " + this, e); + } + catch (ExecutionException e) { + throw new RuntimeException("failed to " + verb + " " + this, e.getCause()); + } + } + +} diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java index 32aef2da118..19b86f1c191 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/api/VespaCurator.java @@ -5,16 +5,105 @@ import com.yahoo.concurrent.UncheckedTimeoutException; import com.yahoo.path.Path; import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Future; /** * A client for a ZooKeeper cluster running inside Vespa. Applications that want to use ZooKeeper can inject this in * their code. * * @author mpolden + * @author jonmv */ public interface VespaCurator { - /** Create and acquire a re-entrant lock in given path. This blocks until the lock is acquired or timeout elapses. */ + /** Returns the stat for the node at the given path, or empty if no node exists at that path. */ + Optional stat(Path path); + + /** Returns the content and stat for the node at the given path, or empty if no node exists at that path. */ + Optional read(Path path); + + /** Writes the given data to a node at the given path, creating it and its parents as needed, and returns the stat of the modified node. */ + Meta write(Path path, byte[] data); + + /** + * Atomically compares the version in the stat of the node at the given path, with the expected version, and then: + * if they are equal, performs the write operation (see {@link #write(Path, byte[])}); + * otherwise, return empty. + */ + Optional write(Path path, byte[] data, int expectedVersion); + + /** Recursively deletes any node at the given path, and any children it may have. */ + void delete(Path path); + + /** + * Atomically compares the version in the stat of the node at the given path, with the expected version, and then: + * if they are equal, performs the recursive delete operation (see {@link #delete(Path)}), and returns {@code} true; + * otherwise, returns {@code false}. + */ + boolean delete(Path path, int expectedVersion); + + List list(Path path); + + /** Creates and acquires a re-entrant lock with the given path. This blocks until the lock is acquired or timeout elapses. */ AutoCloseable lock(Path path, Duration timeout) throws UncheckedTimeoutException; + /** Data of a ZK node, including content (possibly empty, never {@code null}) and metadata. */ + record Data(Meta meta, byte[] data) { } + + /** Metadata for a ZK node. */ + record Meta(int version) { } + + /** + * Register the singleton with the framework, so it may become active, and returns a + * synchronisation handle to any deactivations or activations triggered by this. + * If there is already another active singleton with the given ID (in this JVM), + * that will be deactivated before the new one is activated. + */ + Future registerSingleton(String singletonId, SingletonWorker singleton); + + /** + * Unregister with the framework, so this singleton may no longer be active, and returns + * a synchronisation handle to any deactivation or activation triggered by this. + * If this is the last singleton registered with its ID, then this container immediately releases + * the activation lease for that ID, so another container may acquire it. + */ + Future unregisterSingleton(SingletonWorker singleton); + + /** + * Whether this container currently holds te exclusive lease for activation of singletons with this ID. + */ + boolean isActive(String singletonId); + + /** + * Callback interface for processes of which only a single instance should be active at any time, across all + * containers in the cluster, and across all component generations. Notes: + *
    + *
  • {@link #activate()} is called by the system on a singleton whenever it is the newest registered + * singleton in this container, and this container has the lease for the ID with which the singleton + * was registered. See {@link #registerSingleton} and {@link #isActive}.
  • + *
  • {@link #deactivate()} is called by the system on a singleton which is currently active whenever + * the above no longer holds. See {@link #unregisterSingleton}.
  • + *
  • Callbacks for the same ID are always invoked by the same thread, in serial, + * which means the implemented callbacks must return in a timely manner.
  • + *
  • If activation of a singleton, as a result of the container acquiring the lease at some tpoint,
  • + *
+ * See {@link AbstractSingletonWorker} for an abstract superclass to use for implementations. + */ + interface SingletonWorker { + + /** + * Called by the system whenever this singleton becomes the single active worker. + * If this is triggered because the container obtains the activation lease, and activation fails, + * then the container immediately releases the lease, so another container may acquire it instead. + */ + void activate(); + + /** Called by the system whenever this singleton is no longer the single active worker. */ + void deactivate(); + + } + } -- cgit v1.2.3