diff options
Diffstat (limited to 'zkfacade/src')
3 files changed, 179 insertions, 17 deletions
diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java index ccde53fcf39..31c476b33ae 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/CuratorWrapper.java @@ -2,6 +2,7 @@ package com.yahoo.vespa.curator; import com.yahoo.component.AbstractComponent; import com.yahoo.component.annotation.Inject; +import com.yahoo.jdisc.Metric; import com.yahoo.path.Path; import com.yahoo.vespa.curator.api.VespaCurator; import org.apache.curator.framework.state.ConnectionState; @@ -28,13 +29,13 @@ public class CuratorWrapper extends AbstractComponent implements VespaCurator { private final SingletonManager singletons; @Inject - public CuratorWrapper(Curator curator) { - this(curator, Clock.systemUTC(), Duration.ofSeconds(1)); + public CuratorWrapper(Curator curator, Metric metric) { + this(curator, Clock.systemUTC(), Duration.ofSeconds(1), metric); } - CuratorWrapper(Curator curator, Clock clock, Duration tickTimeout) { + CuratorWrapper(Curator curator, Clock clock, Duration tickTimeout, Metric metric) { this.curator = curator; - this.singletons = new SingletonManager(curator, clock, tickTimeout); + this.singletons = new SingletonManager(curator, clock, tickTimeout, metric); curator.framework().getConnectionStateListenable().addListener((curatorFramework, connectionState) -> { if (connectionState == ConnectionState.LOST) singletons.invalidate(); diff --git a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java index cf7509f8851..9b18a73543b 100644 --- a/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java +++ b/zkfacade/src/main/java/com/yahoo/vespa/curator/SingletonManager.java @@ -1,5 +1,6 @@ package com.yahoo.vespa.curator; +import com.yahoo.jdisc.Metric; import com.yahoo.path.Path; import com.yahoo.protect.Process; import com.yahoo.vespa.curator.api.VespaCurator.SingletonWorker; @@ -38,11 +39,13 @@ class SingletonManager implements AutoCloseable { private final Map<String, Janitor> janitors = new HashMap<>(); private final Map<String, Integer> count = new HashMap<>(); private final Map<SingletonWorker, String> registrations = new IdentityHashMap<>(); + private final Metric metric; - SingletonManager(Curator curator, Clock clock, Duration tickTimeout) { + SingletonManager(Curator curator, Clock clock, Duration tickTimeout, Metric metric) { this.curator = curator; this.clock = clock; this.tickTimeout = tickTimeout; + this.metric = metric; } synchronized CompletableFuture<?> register(String singletonId, SingletonWorker singleton) { @@ -127,14 +130,15 @@ class SingletonManager implements AutoCloseable { final Thread worker; final String id; final Path path; + final MetricHelper metrics; Lock lock = null; boolean active; - Janitor(String id) { this.id = id; this.path = Path.fromString("/vespa/singleton/v1/" + id); this.worker = new Thread(this::run, "singleton-janitor-" + id + "-"); + this.metrics = new MetricHelper(); worker.setDaemon(true); worker.start(); @@ -144,6 +148,7 @@ class SingletonManager implements AutoCloseable { doom.set(null); if (lock != null) try { lock.close(); + metrics.hasLease(false); lock = null; } catch (Exception e) { @@ -203,13 +208,13 @@ class SingletonManager implements AutoCloseable { if (active) { RuntimeException e = null; if (current != null) try { - current.deactivate(); + metrics.deactivation(current::deactivate); } catch (RuntimeException f) { e = f; } try { - singleton.activate(); + metrics.activation(singleton::activate); } catch (RuntimeException f) { if (e == null) e = f; @@ -225,13 +230,13 @@ class SingletonManager implements AutoCloseable { if ( ! singletons.remove(singleton)) return; if (active && current == singleton) { try { - singleton.deactivate(); + metrics.deactivation(singleton::deactivate); } catch (RuntimeException f) { e = f; } if ( ! singletons.isEmpty()) try { - singletons.peek().activate(); + metrics.activation(singletons.peek()::activate); } catch (RuntimeException f) { if (e == null) e = f; @@ -257,6 +262,7 @@ class SingletonManager implements AutoCloseable { Instant start = clock.instant(); if (lock == null) try { lock = curator.lock(path.append("lock"), tickTimeout); + metrics.hasLease(true); } catch (RuntimeException e) { logger.log(Level.FINE, "Failed acquiring lock for '" + path + "' within " + tickTimeout, e); @@ -284,7 +290,7 @@ class SingletonManager implements AutoCloseable { if ( ! active && shouldBeActive) { try { active = true; - if ( ! singletons.isEmpty()) singletons.peek().activate(); + if ( ! singletons.isEmpty()) metrics.activation(singletons.peek()::activate); } catch (RuntimeException e) { logger.log(Level.WARNING, "Failed to activate " + singletons.peek() + ", deactivating again", e); @@ -294,7 +300,7 @@ class SingletonManager implements AutoCloseable { if (active && ! shouldBeActive) { try { active = false; - if ( ! singletons.isEmpty()) singletons.peek().deactivate(); + if ( ! singletons.isEmpty()) metrics.deactivation(singletons.peek()::deactivate); } catch (RuntimeException e) { logger.log(Level.WARNING, "Failed to deactivate " + singletons.peek(), e); @@ -325,6 +331,66 @@ class SingletonManager implements AutoCloseable { } } + private class MetricHelper { + + static final String PREFIX = "singleton."; + static final String HAS_LEASE = PREFIX + "has_lease"; + static final String IS_ACTIVE = PREFIX + "is_active"; + static final String ACTIVATION = PREFIX + "activation.count"; + static final String ACTIVATION_MILLIS = PREFIX + "activation.millis"; + static final String ACTIVATION_FAILURES = PREFIX + "activation.failure.count"; + static final String DEACTIVATION = PREFIX + "deactivation.count"; + static final String DEACTIVATION_MILLIS = PREFIX + "deactivation.millis"; + static final String DEACTIVATION_FAILURES = PREFIX + "deactivation.failure.count"; + + final Metric.Context context; + + MetricHelper() { + this.context = metric.createContext(Map.of("singletonId", id)); + } + + void hasLease(boolean hasLease) { + metric.set(HAS_LEASE, hasLease ? 1 : 0, context); + } + + void activation(Runnable activation) { + Instant start = clock.instant(); + boolean failed = false; + metric.add(ACTIVATION, 1, context); + try { + activation.run(); + } + catch (RuntimeException e) { + failed = true; + throw e; + } + finally { + metric.set(ACTIVATION_MILLIS, Duration.between(start, clock.instant()).toMillis(), context); + if (failed) metric.add(ACTIVATION_FAILURES, 1, context); + else metric.set(IS_ACTIVE, 1, context); + } + } + + void deactivation(Runnable deactivation) { + Instant start = clock.instant(); + boolean failed = false; + metric.add(DEACTIVATION, 1, context); + try { + deactivation.run(); + } + catch (RuntimeException e) { + failed = true; + throw e; + } + finally { + metric.set(DEACTIVATION_MILLIS, Duration.between(start, clock.instant()).toMillis(), context); + if (failed) metric.add(DEACTIVATION_FAILURES, 1, context); + metric.set(IS_ACTIVE, 0, context); + } + } + + } + } } diff --git a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java index bf65045193e..e07ec307108 100644 --- a/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java +++ b/zkfacade/src/test/java/com/yahoo/vespa/curator/CuratorWrapperTest.java @@ -1,5 +1,6 @@ package com.yahoo.vespa.curator; +import com.yahoo.jdisc.test.MockMetric; import com.yahoo.path.Path; import com.yahoo.test.ManualClock; import com.yahoo.vespa.curator.api.AbstractSingletonWorker; @@ -14,9 +15,11 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; @@ -34,7 +37,7 @@ public class CuratorWrapperTest { @Test public void testUserApi() throws Exception { try (Curator wrapped = new MockCurator()) { - CuratorWrapper curator = new CuratorWrapper(wrapped); + CuratorWrapper curator = new CuratorWrapper(wrapped, new MockMetric()); Path path = Path.fromString("path"); assertEquals(Optional.empty(), curator.stat(path)); @@ -81,7 +84,8 @@ public class CuratorWrapperTest { return super.instant(); }; }; - CuratorWrapper curator = new CuratorWrapper(wrapped, clock, Duration.ofMillis(100)); + MockMetric metric = new MockMetric(); + CuratorWrapper curator = new CuratorWrapper(wrapped, clock, Duration.ofMillis(100), metric); // First singleton to register becomes active during construction. Singleton singleton = new Singleton(curator); @@ -95,11 +99,18 @@ public class CuratorWrapperTest { // ... and deactivated as a result of unregistering again. // Singleton can be set up again, but this time, time runs away. + Phaser mark2 = new Phaser(2); // Janitor and helper. + new Thread(() -> { + mark2.arriveAndAwaitAdvance(); // Wait for janitor to call activate. + stunning.arriveAndAwaitAdvance(); // Let janitor measure time spent on activation, while test thread waits for it. + stunning.arriveAndAwaitAdvance(); // Let janitor measure time spent on activation, while test thread waits for it. + }).start(); singleton = new Singleton(curator) { @Override public void activate() { // Set up sync in clock on next renewLease. super.activate(); stunning.register(); + mark2.arrive(); } }; assertTrue(singleton.isActive); @@ -109,26 +120,45 @@ public class CuratorWrapperTest { stunning.arriveAndAwaitAdvance(); // Wait for next updateStatus. clock.advance(Curator.ZK_SESSION_TIMEOUT); singleton.phaser.register(); // Set up so we can synchronise with deactivation. - stunning.arriveAndDeregister(); // Let lease expire, and ensure further ticks complete if we lose the race to unregister. + stunning.forceTermination(); // Let lease expire, and ensure further ticks complete if we lose the race to unregister. singleton.phaser.arriveAndAwaitAdvance(); assertFalse(singleton.isActive); + verifyMetrics(Map.of("activation.count", 2.0, + "activation.millis", 0.0, + "deactivation.count", 2.0, + "deactivation.millis", 0.0, + "is_active", 0.0), + metric); // Singleton is reactivated next tick. singleton.phaser.arriveAndAwaitAdvance(); assertTrue(singleton.isActive); + verifyMetrics(Map.of("activation.count", 3.0, + "activation.millis", 0.0, + "deactivation.count", 2.0, + "deactivation.millis", 0.0, + "is_active", 1.0, + "has_lease", 1.0), + metric); // Manager unregisters remaining singletons on shutdown. curator.deconstruct(); singleton.phaser.arriveAndAwaitAdvance(); assertFalse(singleton.isActive); + verifyMetrics(Map.of("activation.count", 3.0, + "activation.millis", 0.0, + "deactivation.count", 3.0, + "deactivation.millis", 0.0), + metric); } } @Test public void testSingletonsInSameContainer() { try (Curator wrapped = new MockCurator()) { - CuratorWrapper curator = new CuratorWrapper(wrapped); + MockMetric metric = new MockMetric(); + CuratorWrapper curator = new CuratorWrapper(wrapped, metric); // First singleton to register becomes active during construction. Singleton singleton = new Singleton(curator); @@ -154,6 +184,13 @@ public class CuratorWrapperTest { assertFalse(newerSingleton.isActive); assertTrue(newSingleton.isActive); assertFalse(singleton.isActive); + verifyMetrics(Map.of("activation.count", 4.0, + "activation.millis", 0.0, + "deactivation.count", 3.0, + "deactivation.millis", 0.0, + "is_active", 1.0, + "has_lease", 1.0), + metric); // Add a singleton which fails activation. Phaser stunning = new Phaser(2); @@ -180,11 +217,36 @@ public class CuratorWrapperTest { stunning.arriveAndAwaitAdvance(); // Failing component is about to be deactivated. assertFalse(newSingleton.isActive); assertTrue(curator.isActive(newSingleton.id())); // No actual active components, but container has the lease. + verifyMetrics(Map.of("activation.count", 5.0, + "activation.millis", 0.0, + "activation.failure.count", 1.0, + "deactivation.count", 5.0, + "deactivation.millis", 0.0, + "is_active", 0.0, + "has_lease", 1.0), + metric); stunning.arriveAndAwaitAdvance(); // Failing component is done being deactivated. stunning.arriveAndAwaitAdvance(); // Failing component is done cleaning up after itself. assertTrue(newSingleton.isActive); assertEquals("failed to register failing singleton", thrownMessage.get()); + verifyMetrics(Map.of("activation.count", 6.0, + "activation.millis", 0.0, + "activation.failure.count", 1.0, + "deactivation.count", 5.0, + "deactivation.millis", 0.0, + "is_active", 1.0, + "has_lease", 1.0), + metric); + newSingleton.shutdown(); + verifyMetrics(Map.of("activation.count", 6.0, + "activation.millis", 0.0, + "activation.failure.count", 1.0, + "deactivation.count", 6.0, + "deactivation.millis", 0.0, + "is_active", 0.0, + "has_lease", 0.0), + metric); curator.deconstruct(); } @@ -193,7 +255,8 @@ public class CuratorWrapperTest { @Test public void testSingletonsInDifferentContainers() { try (MockCurator wrapped = new MockCurator()) { - CuratorWrapper curator = new CuratorWrapper(wrapped, Clock.systemUTC(), Duration.ofMillis(100)); + MockMetric metric = new MockMetric(); + CuratorWrapper curator = new CuratorWrapper(wrapped, Clock.systemUTC(), Duration.ofMillis(100), metric); // Simulate a different container holding the lock. Singleton singleton; @@ -201,12 +264,18 @@ public class CuratorWrapperTest { singleton = new Singleton(curator); assertFalse(singleton.isActive); assertFalse(curator.isActive(singleton.id())); + assertEquals(Map.of(), metric.metrics()); singleton.phaser.register(); } singleton.phaser.arriveAndAwaitAdvance(); assertTrue(curator.isActive(singleton.id())); assertTrue(singleton.isActive); + verifyMetrics(Map.of("activation.count", 1.0, + "activation.millis", 0.0, + "is_active", 1.0, + "has_lease", 1.0), + metric); // Simulate a different container wanting the lock. Phaser stunning = new Phaser(2); @@ -222,14 +291,36 @@ public class CuratorWrapperTest { stunning.arriveAndAwaitAdvance(); singleton.phaser.arriveAndAwaitAdvance(); assertFalse(singleton.isActive); + verifyMetrics(Map.of("activation.count", 1.0, + "activation.millis", 0.0, + "deactivation.count", 1.0, + "deactivation.millis", 0.0, + "is_active", 0.0, + "has_lease", 0.0), + metric); // Connection is restored, and the other container releases the lock again. stunning.arriveAndAwaitAdvance(); singleton.phaser.arriveAndAwaitAdvance(); assertTrue(singleton.isActive); + verifyMetrics(Map.of("activation.count", 2.0, + "activation.millis", 0.0, + "deactivation.count", 1.0, + "deactivation.millis", 0.0, + "is_active", 1.0, + "has_lease", 1.0), + metric); + singleton.phaser.arriveAndDeregister(); singleton.shutdown(); assertFalse(singleton.isActive); + verifyMetrics(Map.of("activation.count", 2.0, + "activation.millis", 0.0, + "deactivation.count", 2.0, + "deactivation.millis", 0.0, + "is_active", 0.0, + "has_lease", 0.0), + metric); curator.deconstruct(); } @@ -245,4 +336,8 @@ public class CuratorWrapperTest { public void shutdown() { unregister(Duration.ofSeconds(2)); } } + static void verifyMetrics(Map<String, Double> expected, MockMetric metrics) { + expected.forEach((metric, value) -> assertEquals(metric, value, metrics.metrics().get("singleton." + metric).get(Map.of("singletonId", "singleton")))); + } + } |