diff options
5 files changed, 87 insertions, 18 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index dbed92a12b3..78d6a583c24 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -11,6 +11,7 @@ import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.Metric; import com.yahoo.vespa.curator.Lock; import java.time.Clock; @@ -44,6 +45,7 @@ public class Reindexer { private final Map<DocumentType, Instant> ready; private final ReindexingCurator database; private final Function<VisitorParameters, Runnable> visitorSessions; + private final ReindexingMetrics metrics; private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. @@ -52,7 +54,7 @@ public class Reindexer { @Inject public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - DocumentAccess access, Clock clock) { + DocumentAccess access, Metric metric, Clock clock) { this(cluster, ready, database, @@ -64,11 +66,12 @@ public class Reindexer { throw new IllegalStateException(e); } }, + metric, clock); } Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, - Function<VisitorParameters, Runnable> visitorSessions, Clock clock) { + Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) { for (DocumentType type : ready.keySet()) cluster.bucketSpaceOf(type); // Verifies this is known. @@ -116,6 +119,7 @@ public class Reindexer { status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required. database.writeReindexing(reindexing = reindexing.with(type, status)); + metrics.dump(reindexing); switch (status.state()) { default: @@ -141,6 +145,7 @@ public class Reindexer { if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) { progressLastStored.set(clock.instant()); database.writeReindexing(reindexing = reindexing.with(type, status)); + metrics.dump(reindexing); } } @Override @@ -173,7 +178,8 @@ public class Reindexer { log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant())); status = status.successful(clock.instant()); } - database.writeReindexing(reindexing.with(type, status)); + database.writeReindexing(reindexing = reindexing.with(type, status)); + metrics.dump(reindexing); } VisitorParameters createParameters(DocumentType type, ProgressToken progress) { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java index 792889e4aa8..51322c37a7d 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java @@ -121,7 +121,7 @@ public class Reindexing { public Status failed(Instant now, String message) { if (state != State.RUNNING) throw new IllegalStateException("Current state must be RUNNING when changing to FAILED"); - return new Status(startedAt, requireNonNull(now), null, State.FAILED, requireNonNull(message)); + return new Status(startedAt, requireNonNull(now), progress, State.FAILED, requireNonNull(message)); } public Instant startedAt() { diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 740a04619d1..a336ad02f20 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -12,6 +12,7 @@ import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.jdisc.Metric; import com.yahoo.net.HostName; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; @@ -52,13 +53,14 @@ public class ReindexingMaintainer extends AbstractComponent { @Inject public ReindexingMaintainer(@SuppressWarnings("unused") VespaZooKeeperServer ensureZkHasStarted, + Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { - this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig); + this(Clock.systemUTC(), metric, access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig); } - ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig, + ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig, ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig); @@ -68,6 +70,7 @@ public class ReindexingMaintainer extends AbstractComponent { reindexingConfig.clusterName(), manager), access, + metric, clock); this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java new file mode 100644 index 00000000000..5a5a866f153 --- /dev/null +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java @@ -0,0 +1,47 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.jdisc.Metric; + +import java.time.Clock; +import java.util.Map; + +import static ai.vespa.reindexing.Reindexing.State.SUCCESSFUL; + +/** + * Metrics for reindexing in a content cluster. + * + * @author jonmv + */ +class ReindexingMetrics { + + private final Metric metric; + private final String cluster; + + ReindexingMetrics(Metric metric, String cluster) { + this.metric = metric; + this.cluster = cluster; + } + + void dump(Reindexing reindexing) { + reindexing.status().forEach((type, status) -> { + metric.set("reindexing.percent.done", + status.progress().map(ProgressToken::percentFinished).orElse(status.state() == SUCCESSFUL ? 100.0 : 0.0), + metric.createContext(Map.of("clusterid", cluster, + "documenttype", type.getName(), + "state", toString(status.state())))); + }); + } + + private static String toString(Reindexing.State state) { + switch (state) { + case READY: return "ready"; + case RUNNING: return "running"; + case FAILED: return "failed"; + case SUCCESSFUL: return "successful"; + default: throw new IllegalArgumentException("Unknown reindexing state '" + state + "'"); + } + } + +} diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index a5ad2ba32f1..9e3992c4d0f 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -4,7 +4,6 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexer.Cluster; import ai.vespa.reindexing.Reindexing.Status; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; -import com.yahoo.document.Document; import com.yahoo.document.DocumentType; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.config.DocumentmanagerConfig; @@ -12,6 +11,7 @@ import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.test.MockMetric; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.test.ManualClock; import com.yahoo.vespa.curator.mock.MockCurator; @@ -40,13 +40,13 @@ import static org.junit.jupiter.api.Assertions.fail; */ class ReindexerTest { - static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); }; + static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> fail("Not supposed to run"); final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); - final Document document1 = new Document(music, "id:ns:music::one"); final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); + final MockMetric metric = new MockMetric(); final ManualClock clock = new ManualClock(Instant.EPOCH); ReindexingCurator database; @@ -63,12 +63,13 @@ class ReindexerTest { Map.of(music, Instant.EPOCH), database, failIfCalled, + metric, clock)); } @Test void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException { - Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock); Executors.newSingleThreadExecutor().submit(database::lockReindexing).get(); assertThrows(ReindexingLockException.class, reindexer::reindex); } @@ -76,12 +77,13 @@ class ReindexerTest { @Test @Timeout(10) void nothingToDoWithEmptyConfig() throws ReindexingLockException { - new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex(); + new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex(); + assertEquals(Map.of(), metric.metrics()); } @Test void testParameters() { - Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock); + Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock); ProgressToken token = new ProgressToken(); VisitorParameters parameters = reindexer.createParameters(music, token); assertEquals("music:[document]", parameters.getFieldSet()); @@ -98,14 +100,19 @@ class ReindexerTest { void testReindexing() throws ReindexingLockException { // Reindexer is told to update "music" documents no earlier than EPOCH, which is just now. // Since "music" is a new document type, it is stored as just reindexed, and nothing else happens. - new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex(); Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH)); assertEquals(reindexing, database.readReindexing()); + assertEquals(Map.of("reindexing.percent.done", Map.of(Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "successful"), + 100.0)), + metric.metrics()); // New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet. // Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew. clock.advance(Duration.ofMillis(5)); - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing()); // It's time to reindex the "music" documents — let this complete successfully. @@ -116,13 +123,14 @@ class ReindexerTest { database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); - }, clock).reindex(); + }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); // One more reindexing, this time shut down before visit completes, but after progress is reported. clock.advance(Duration.ofMillis(10)); + metric.metrics().clear(); shutDown.set(false); AtomicReference<Reindexer> aborted = new AtomicReference<>(); aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> { @@ -133,11 +141,16 @@ class ReindexerTest { shutDown.set(true); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down"); }; - }, clock)); + }, metric, clock)); aborted.get().reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); + assertEquals(Map.of("reindexing.percent.done", Map.of(Map.of("documenttype", "music", + "clusterid", "cluster", + "state", "ready"), + 100.0)), // new ProgressToken() is 100% done. + metric.metrics()); // Last reindexing fails. clock.advance(Duration.ofMillis(10)); @@ -146,13 +159,13 @@ class ReindexerTest { database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer. executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error")); return () -> shutDown.set(true); - }, clock).reindex(); + }, metric, clock).reindex(); reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error")); assertEquals(reindexing, database.readReindexing()); assertTrue(shutDown.get(), "Session was shut down"); // Document type is ignored in next run, as it has failed fatally. - new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex(); + new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex(); assertEquals(reindexing, database.readReindexing()); } |