diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 17:29:53 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 17:29:53 +0100 |
commit | afdeeb2ab52af526a1225547a9ce469c8e7cbabc (patch) | |
tree | 116f1c00233037ab5af0dd3e56b2b20819304197 /clustercontroller-reindexer | |
parent | b71490c769a7a2660622f1bc162daeecbae29d51 (diff) |
Test setup in ReindexingMaintainer
Diffstat (limited to 'clustercontroller-reindexer')
2 files changed, 147 insertions, 9 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 764dca9dfda..7ca98f9a107 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -16,7 +16,6 @@ import com.yahoo.net.HostName; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.curator.Lock; import com.yahoo.vespa.zookeeper.VespaZooKeeperServer; import java.time.Clock; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; @@ -68,8 +68,8 @@ public class ReindexingMaintainer extends AbstractComponent { clock); this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) - scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(), - HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); + scheduleStaggered((delayMillis, intervalMillis) -> executor.scheduleAtFixedRate(this::maintain, delayMillis, intervalMillis, TimeUnit.MILLISECONDS), + Duration.ofMinutes(1), clock.instant(), HostName.getLocalhost(), zookeepersConfig.zookeeperserverlist()); } private void maintain() { @@ -104,8 +104,8 @@ public class ReindexingMaintainer extends AbstractComponent { typeStatus -> Instant.ofEpochMilli(typeStatus.getValue().readyAtMillis()))); } - /** Schedules the given task with the given interval (across all containers in this ZK cluster). */ - static void scheduleStaggered(Runnable task, ScheduledExecutorService executor, + /** Schedules a task with the given interval (across all containers in this ZK cluster). */ + static void scheduleStaggered(BiConsumer<Long, Long> scheduler, Duration interval, Instant now, String hostname, String clusterHostnames) { long delayMillis = 0; @@ -116,13 +116,13 @@ public class ReindexingMaintainer extends AbstractComponent { if (hostnames.contains(hostname)) { long offset = hostnames.indexOf(hostname) * intervalMillis; intervalMillis *= hostnames.size(); - delayMillis = Math.floorMod(offset - now.toEpochMilli(), interval.toMillis()); + delayMillis = Math.floorMod(offset - now.toEpochMilli(), intervalMillis); } - executor.scheduleAtFixedRate(task, delayMillis, intervalMillis, TimeUnit.MILLISECONDS); + scheduler.accept(delayMillis, intervalMillis); } - private static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets, - DocumentTypeManager manager) { + static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets, + DocumentTypeManager manager) { return clusters.storage().stream() .filter(storage -> storage.name().equals(name)) .map(storage -> new Cluster(name, diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java new file mode 100644 index 00000000000..a64da00f3f1 --- /dev/null +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexingMaintainerTest.java @@ -0,0 +1,138 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.reindexing; + +import ai.vespa.reindexing.Reindexer.Cluster; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.vespa.config.content.reindexing.ReindexingConfig; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.time.Instant; +import java.util.Map; + +import static ai.vespa.reindexing.ReindexingMaintainer.parseCluster; +import static ai.vespa.reindexing.ReindexingMaintainer.parseReady; +import static ai.vespa.reindexing.ReindexingMaintainer.scheduleStaggered; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * @author jonmv + */ +class ReindexingMaintainerTest { + + @Test + void testParsing() { + DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); + DocumentTypeManager manager = new DocumentTypeManager(musicConfig); + + assertEquals(Map.of(manager.getDocumentType("music"), Instant.ofEpochMilli(123)), + parseReady(new ReindexingConfig.Builder() + .enabled(true) + .clusterName("cluster") + .status("music", new ReindexingConfig.Status.Builder().readyAtMillis(123)) + .build(), + manager)); + + // Unknown document type fails + assertThrows(IllegalArgumentException.class, + () -> parseReady(new ReindexingConfig.Builder() + .status("poetry", new ReindexingConfig.Status.Builder().readyAtMillis(123)) + .build(), + manager)); + + assertEquals(new Cluster("cluster", "configId", Map.of(manager.getDocumentType("music"), "default")), + parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("oyster") + .configid("configId")) + .storage(new ClusterListConfig.Storage.Builder() + .name("cluster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("global"))) + .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("default"))) + .build(), + manager)); + + // Cluster missing in bucket space list fails. + assertThrows(IllegalArgumentException.class, + () -> parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("cluster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("oyster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("global"))) + .build(), + manager)); + + // Cluster missing in cluster list fails. + assertThrows(IllegalArgumentException.class, + () -> parseCluster("cluster", + new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder() + .name("oyster") + .configid("configId")) + .build(), + new AllClustersBucketSpacesConfig.Builder() + .cluster("cluster", new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace("default"))) + .build(), + manager)); + } + + @Test + void testStaggering() { + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(0, delayMillis); + assertEquals(10, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(27), + "host", + "nys:123,hark:123"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(3, delayMillis); + assertEquals(10, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(27), + "host", + "host:123"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(7, delayMillis); + assertEquals(20, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(13), + "host", + "host:123,:nys:321"); + + scheduleStaggered((delayMillis, intervalMillis) -> { + assertEquals(17, delayMillis); + assertEquals(20, intervalMillis); + }, + Duration.ofMillis(10), + Instant.ofEpochMilli(13), + "nys", + "host:123,nys:321"); + } + +} |