summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-16 13:27:10 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-19 11:04:46 +0100
commit5b110acc79bee12f6a3243d1e78697d5b11f5dcb (patch)
tree6075399d23ed2f423c70c8501f054e00fa252ef5 /clustercontroller-reindexer/src
parent4de0fc18bbe12a1646bfda2b505cef0d2fbe99c1 (diff)
Write metrics regularly when doing reindexing
Diffstat (limited to 'clustercontroller-reindexer/src')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java12
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java2
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java7
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java47
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java37
5 files changed, 87 insertions, 18 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index dbed92a12b3..78d6a583c24 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -11,6 +11,7 @@ import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Metric;
import com.yahoo.vespa.curator.Lock;
import java.time.Clock;
@@ -44,6 +45,7 @@ public class Reindexer {
private final Map<DocumentType, Instant> ready;
private final ReindexingCurator database;
private final Function<VisitorParameters, Runnable> visitorSessions;
+ private final ReindexingMetrics metrics;
private final Clock clock;
private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
@@ -52,7 +54,7 @@ public class Reindexer {
@Inject
public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
- DocumentAccess access, Clock clock) {
+ DocumentAccess access, Metric metric, Clock clock) {
this(cluster,
ready,
database,
@@ -64,11 +66,12 @@ public class Reindexer {
throw new IllegalStateException(e);
}
},
+ metric,
clock);
}
Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
- Function<VisitorParameters, Runnable> visitorSessions, Clock clock) {
+ Function<VisitorParameters, Runnable> visitorSessions, Metric metric, Clock clock) {
for (DocumentType type : ready.keySet())
cluster.bucketSpaceOf(type); // Verifies this is known.
@@ -116,6 +119,7 @@ public class Reindexer {
status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required.
database.writeReindexing(reindexing = reindexing.with(type, status));
+ metrics.dump(reindexing);
switch (status.state()) {
default:
@@ -141,6 +145,7 @@ public class Reindexer {
if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) {
progressLastStored.set(clock.instant());
database.writeReindexing(reindexing = reindexing.with(type, status));
+ metrics.dump(reindexing);
}
}
@Override
@@ -173,7 +178,8 @@ public class Reindexer {
log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant()));
status = status.successful(clock.instant());
}
- database.writeReindexing(reindexing.with(type, status));
+ database.writeReindexing(reindexing = reindexing.with(type, status));
+ metrics.dump(reindexing);
}
VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
index 792889e4aa8..51322c37a7d 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexing.java
@@ -121,7 +121,7 @@ public class Reindexing {
public Status failed(Instant now, String message) {
if (state != State.RUNNING)
throw new IllegalStateException("Current state must be RUNNING when changing to FAILED");
- return new Status(startedAt, requireNonNull(now), null, State.FAILED, requireNonNull(message));
+ return new Status(startedAt, requireNonNull(now), progress, State.FAILED, requireNonNull(message));
}
public Instant startedAt() {
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 740a04619d1..a336ad02f20 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -12,6 +12,7 @@ import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.jdisc.Metric;
import com.yahoo.net.HostName;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
@@ -52,13 +53,14 @@ public class ReindexingMaintainer extends AbstractComponent {
@Inject
public ReindexingMaintainer(@SuppressWarnings("unused") VespaZooKeeperServer ensureZkHasStarted,
+ Metric metric,
DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
- this(Clock.systemUTC(), access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig);
+ this(Clock.systemUTC(), metric, access, zookeepersConfig, clusterListConfig, allClustersBucketSpacesConfig, reindexingConfig, documentmanagerConfig);
}
- ReindexingMaintainer(Clock clock, DocumentAccess access, ZookeepersConfig zookeepersConfig,
+ ReindexingMaintainer(Clock clock, Metric metric, DocumentAccess access, ZookeepersConfig zookeepersConfig,
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig);
@@ -68,6 +70,7 @@ public class ReindexingMaintainer extends AbstractComponent {
reindexingConfig.clusterName(),
manager),
access,
+ metric,
clock);
this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java
new file mode 100644
index 00000000000..5a5a866f153
--- /dev/null
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMetrics.java
@@ -0,0 +1,47 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.jdisc.Metric;
+
+import java.time.Clock;
+import java.util.Map;
+
+import static ai.vespa.reindexing.Reindexing.State.SUCCESSFUL;
+
+/**
+ * Metrics for reindexing in a content cluster.
+ *
+ * @author jonmv
+ */
+class ReindexingMetrics {
+
+ private final Metric metric;
+ private final String cluster;
+
+ ReindexingMetrics(Metric metric, String cluster) {
+ this.metric = metric;
+ this.cluster = cluster;
+ }
+
+ void dump(Reindexing reindexing) {
+ reindexing.status().forEach((type, status) -> {
+ metric.set("reindexing.percent.done",
+ status.progress().map(ProgressToken::percentFinished).orElse(status.state() == SUCCESSFUL ? 100.0 : 0.0),
+ metric.createContext(Map.of("clusterid", cluster,
+ "documenttype", type.getName(),
+ "state", toString(status.state()))));
+ });
+ }
+
+ private static String toString(Reindexing.State state) {
+ switch (state) {
+ case READY: return "ready";
+ case RUNNING: return "running";
+ case FAILED: return "failed";
+ case SUCCESSFUL: return "successful";
+ default: throw new IllegalArgumentException("Unknown reindexing state '" + state + "'");
+ }
+ }
+
+}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index a5ad2ba32f1..9e3992c4d0f 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -4,7 +4,6 @@ package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexer.Cluster;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
-import com.yahoo.document.Document;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.config.DocumentmanagerConfig;
@@ -12,6 +11,7 @@ import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.test.MockMetric;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.test.ManualClock;
import com.yahoo.vespa.curator.mock.MockCurator;
@@ -40,13 +40,13 @@ import static org.junit.jupiter.api.Assertions.fail;
*/
class ReindexerTest {
- static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> { fail("Not supposed to run"); };
+ static final Function<VisitorParameters, Runnable> failIfCalled = __ -> () -> fail("Not supposed to run");
final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
final DocumentType music = manager.getDocumentType("music");
- final Document document1 = new Document(music, "id:ns:music::one");
final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
+ final MockMetric metric = new MockMetric();
final ManualClock clock = new ManualClock(Instant.EPOCH);
ReindexingCurator database;
@@ -63,12 +63,13 @@ class ReindexerTest {
Map.of(music, Instant.EPOCH),
database,
failIfCalled,
+ metric,
clock));
}
@Test
void throwsWhenLockHeldElsewhere() throws InterruptedException, ExecutionException {
- Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock);
Executors.newSingleThreadExecutor().submit(database::lockReindexing).get();
assertThrows(ReindexingLockException.class, reindexer::reindex);
}
@@ -76,12 +77,13 @@ class ReindexerTest {
@Test
@Timeout(10)
void nothingToDoWithEmptyConfig() throws ReindexingLockException {
- new Reindexer(cluster, Map.of(), database, failIfCalled, clock).reindex();
+ new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock).reindex();
+ assertEquals(Map.of(), metric.metrics());
}
@Test
void testParameters() {
- Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, clock);
+ Reindexer reindexer = new Reindexer(cluster, Map.of(), database, failIfCalled, metric, clock);
ProgressToken token = new ProgressToken();
VisitorParameters parameters = reindexer.createParameters(music, token);
assertEquals("music:[document]", parameters.getFieldSet());
@@ -98,14 +100,19 @@ class ReindexerTest {
void testReindexing() throws ReindexingLockException {
// Reindexer is told to update "music" documents no earlier than EPOCH, which is just now.
// Since "music" is a new document type, it is stored as just reindexed, and nothing else happens.
- new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.EPOCH), database, failIfCalled, metric, clock).reindex();
Reindexing reindexing = Reindexing.empty().with(music, Status.ready(Instant.EPOCH).running().successful(Instant.EPOCH));
assertEquals(reindexing, database.readReindexing());
+ assertEquals(Map.of("reindexing.percent.done", Map.of(Map.of("documenttype", "music",
+ "clusterid", "cluster",
+ "state", "successful"),
+ 100.0)),
+ metric.metrics());
// New config tells reindexer to reindex "music" documents no earlier than at 10 millis after EPOCH, which isn't yet.
// Nothing happens, since it's not yet time. This isn't supposed to happen unless high clock skew.
clock.advance(Duration.ofMillis(5));
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(10)), database, failIfCalled, metric, clock).reindex();
assertEquals(reindexing, database.readReindexing());
// It's time to reindex the "music" documents — let this complete successfully.
@@ -116,13 +123,14 @@ class ReindexerTest {
database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
- }, clock).reindex();
+ }, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
// One more reindexing, this time shut down before visit completes, but after progress is reported.
clock.advance(Duration.ofMillis(10));
+ metric.metrics().clear();
shutDown.set(false);
AtomicReference<Reindexer> aborted = new AtomicReference<>();
aborted.set(new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, parameters -> {
@@ -133,11 +141,16 @@ class ReindexerTest {
shutDown.set(true);
parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Shut down");
};
- }, clock));
+ }, metric, clock));
aborted.get().reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
+ assertEquals(Map.of("reindexing.percent.done", Map.of(Map.of("documenttype", "music",
+ "clusterid", "cluster",
+ "state", "ready"),
+ 100.0)), // new ProgressToken() is 100% done.
+ metric.metrics());
// Last reindexing fails.
clock.advance(Duration.ofMillis(10));
@@ -146,13 +159,13 @@ class ReindexerTest {
database.writeReindexing(Reindexing.empty()); // Wipe database to verify we write data from reindexer.
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "Error"));
return () -> shutDown.set(true);
- }, clock).reindex();
+ }, metric, clock).reindex();
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().failed(clock.instant(), "Error"));
assertEquals(reindexing, database.readReindexing());
assertTrue(shutDown.get(), "Session was shut down");
// Document type is ignored in next run, as it has failed fatally.
- new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, clock).reindex();
+ new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(30)), database, failIfCalled, metric, clock).reindex();
assertEquals(reindexing, database.readReindexing());
}