summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-05 17:29:53 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-05 17:29:53 +0100
commitafdeeb2ab52af526a1225547a9ce469c8e7cbabc (patch)
tree116f1c00233037ab5af0dd3e56b2b20819304197 /clustercontroller-reindexer
parentb71490c769a7a2660622f1bc162daeecbae29d51 (diff)
Test setup in ReindexingMaintainer
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java18
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java138
2 files changed, 147 insertions, 9 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 764dca9dfda..7ca98f9a107 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -16,7 +16,6 @@ import com.yahoo.net.HostName;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
import com.yahoo.vespa.curator.Curator;
-import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.zookeeper.VespaZooKeeperServer;
import java.time.Clock;
@@ -27,6 +26,7 @@ import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
@@ -68,8 +68,8 @@ public class ReindexingMaintainer extends AbstractComponent {
clock);
this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
- scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(),
- HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
+ scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS),
+ Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist());
}
private void maintain() {
@@ -104,8 +104,8 @@ public class ReindexingMaintainer extends AbstractComponent {
typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis())));
}
- /** Schedules the given task with the given interval (across all containers in this ZK cluster). */
- static void scheduleStaggered(Runnable task, ScheduledExecutorService executor,
+ /** Schedules a task with the given interval (across all containers in this ZK cluster). */
+ static void scheduleStaggered(BiConsumer<Long, Long> scheduler,
Duration interval, Instant now,
String hostname, String clusterHostnames) {
long delayMillis = 0;
@@ -116,13 +116,13 @@ public class ReindexingMaintainer extends AbstractComponent {
if (hostnames.contains(hostname)) {
long offset = hostnames.indexOf(hostname) * intervalMillis;
intervalMillis *= hostnames.size();
- delayMillis = Math.floorMod(offset - now.toEpochMilli(), interval.toMillis());
+ delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis);
}
- executor.scheduleAtFixedRate(task, delayMillis, intervalMillis, TimeUnit.MILLISECONDS);
+ scheduler.accept(delayMillis, intervalMillis);
}
- private static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets,
- DocumentTypeManager manager) {
+ static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets,
+ DocumentTypeManager manager) {
return clusters.storage().stream()
.filter(storage -> storage.name().equals(name))
.map(storage -> new Cluster(name,
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
new file mode 100644
index 00000000000..a64da00f3f1
--- /dev/null
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java
@@ -0,0 +1,138 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.reindexing;
+
+import ai.vespa.reindexing.Reindexer.Cluster;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.searchdefinition.derived.Deriver;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.vespa.config.content.reindexing.ReindexingConfig;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+
+import static ai.vespa.reindexing.ReindexingMaintainer.parseCluster;
+import static ai.vespa.reindexing.ReindexingMaintainer.parseReady;
+import static ai.vespa.reindexing.ReindexingMaintainer.scheduleStaggered;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * @author jonmv
+ */
+class ReindexingMaintainerTest {
+
+ @Test
+ void testParsing() {
+ DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
+ DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
+
+ assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)),
+ parseReady(new ReindexingConfig.Builder()
+ .enabled(true)
+ .clusterName("cluster")
+ .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ .build(),
+ manager));
+
+ // Unknown document type fails
+ assertThrows(IllegalArgumentException.class,
+ () -> parseReady(new ReindexingConfig.Builder()
+ .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123))
+ .build(),
+ manager));
+
+ assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")),
+ parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("oyster")
+ .configid("configId"))
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("cluster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("global")))
+ .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("default")))
+ .build(),
+ manager));
+
+ // Cluster missing in bucket space list fails.
+ assertThrows(IllegalArgumentException.class,
+ () -> parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("cluster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("global")))
+ .build(),
+ manager));
+
+ // Cluster missing in cluster list fails.
+ assertThrows(IllegalArgumentException.class,
+ () -> parseCluster("cluster",
+ new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder()
+ .name("oyster")
+ .configid("configId"))
+ .build(),
+ new AllClustersBucketSpacesConfig.Builder()
+ .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace("default")))
+ .build(),
+ manager));
+ }
+
+ @Test
+ void testStaggering() {
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(0, delayMillis);
+ assertEquals(10, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(27),
+ "host",
+ "nys:123,hark:123");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(3, delayMillis);
+ assertEquals(10, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(27),
+ "host",
+ "host:123");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(7, delayMillis);
+ assertEquals(20, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(13),
+ "host",
+ "host:123,:nys:321");
+
+ scheduleStaggered((delayMillis, intervalMillis) -> {
+ assertEquals(17, delayMillis);
+ assertEquals(20, intervalMillis);
+ },
+ Duration.ofMillis(10),
+ Instant.ofEpochMilli(13),
+ "nys",
+ "host:123,nys:321");
+ }
+
+}