diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-01-14 10:55:23 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-01-14 10:55:23 +0100 |
commit | b0b1736459aedca891d0ddb355e922d5e5c2c299 (patch) | |
tree | edca6a3d93b741b594398f9bcd3860ae4e9b54fa /configserver | |
parent | f0dda9a306ecb000be56512732c620f5d7c5b3f1 (diff) |
Let maintainer remove data for no-longer-existent types
Diffstat (limited to 'configserver')
3 files changed, 94 insertions, 3 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java index cb81bed155e..2828b4c62e0 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java @@ -11,7 +11,6 @@ import java.util.Optional; import java.util.stream.Stream; import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; /** @@ -79,12 +78,32 @@ public class ApplicationReindexing implements Reindexing { /** Returns a copy of this with no pending reindexing for the given document type. */ public ApplicationReindexing withoutPending(String cluster, String documentType) { Cluster current = clusters.getOrDefault(cluster, Cluster.ready(common)); + if (current == null) + return this; + Cluster modified = new Cluster(current.common, without(documentType, current.pending), current.ready); return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); } + /** Returns a copy of this without the given cluster. */ + public ApplicationReindexing without(String cluster) { + return new ApplicationReindexing(enabled, common, without(cluster, clusters)); + } + + /** Returns a copy of this without the given document type in the given cluster. */ + public ApplicationReindexing without(String cluster, String documentType) { + Cluster current = clusters.get(cluster); + if (current == null) + return this; + + Cluster modified = new Cluster(current.common, + current.pending, + without(documentType, current.ready)); + return new ApplicationReindexing(enabled, common, with(cluster, modified, clusters)); + } + /** Returns a copy of this with the enabled-state set to the given value. */ public ApplicationReindexing enabled(boolean enabled) { return new ApplicationReindexing(enabled, common, clusters); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java index 22eb95261bd..a8290b55874 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainer.java @@ -6,25 +6,32 @@ import com.yahoo.vespa.config.server.ApplicationRepository; import com.yahoo.vespa.config.server.application.Application; import com.yahoo.vespa.config.server.application.ApplicationCuratorDatabase; import com.yahoo.vespa.config.server.application.ApplicationReindexing; +import com.yahoo.vespa.config.server.application.ApplicationReindexing.Cluster; import com.yahoo.vespa.config.server.application.ConfigConvergenceChecker; import com.yahoo.vespa.config.server.tenant.Tenant; import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.model.VespaModel; +import com.yahoo.vespa.model.content.cluster.ContentCluster; import com.yahoo.yolean.Exceptions; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collection; import java.util.Comparator; +import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * Watches pending reindexing, and sets these to ready when config convergence is observed. + * Also removes data for clusters or document types which no longer exist. * * @author jonmv */ @@ -55,8 +62,11 @@ public class ReindexingMaintainer extends ConfigServerMaintainer { .map(application -> application.getForVersionOrLatest(Optional.empty(), clock.instant())) .ifPresent(application -> { try { - applicationRepository.modifyReindexing(id, reindexing -> - withNewReady(reindexing, lazyGeneration(application), clock.instant())); + applicationRepository.modifyReindexing(id, reindexing -> { + reindexing = withNewReady(reindexing, lazyGeneration(application), clock.instant()); + reindexing = withOnlyCurrentData(reindexing, application); + return reindexing; + }); } catch (RuntimeException e) { log.log(Level.INFO, "Failed to update reindexing status for " + id + ": " + Exceptions.toMessageString(e)); @@ -90,4 +100,33 @@ public class ReindexingMaintainer extends ConfigServerMaintainer { return reindexing; } + static ApplicationReindexing withOnlyCurrentData(ApplicationReindexing reindexing, Application application) { + return withOnlyCurrentData(reindexing, clusterDocumentTypes(application)); + } + + static ApplicationReindexing withOnlyCurrentData(ApplicationReindexing reindexing, Map<String, ? extends Collection<String>> clusterDocumentTypes) { + for (String clusterId : reindexing.clusters().keySet()) { + if ( ! clusterDocumentTypes.containsKey(clusterId)) + reindexing = reindexing.without(clusterId); + else { + Cluster cluster = reindexing.clusters().get(clusterId); + Collection<String> documentTypes = clusterDocumentTypes.get(clusterId); + for (String pending : cluster.pending().keySet()) + if ( ! documentTypes.contains(pending)) + reindexing = reindexing.withoutPending(clusterId, pending); + for (String ready : cluster.ready().keySet()) + if ( ! documentTypes.contains(ready)) + reindexing = reindexing.without(clusterId, ready); + } + } + return reindexing; + } + + static Map<String, Collection<String>> clusterDocumentTypes(Application application) { + Map<String, ContentCluster> contentClusters = ((VespaModel) application.getModel()).getContentClusters(); + return contentClusters.entrySet().stream() + .collect(Collectors.toMap(cluster -> cluster.getKey(), + cluster -> cluster.getValue().getDocumentDefinitions().keySet())); + } + } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java index d75b91f45e3..d0a4cd59dbd 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/maintenance/ReindexingMaintainerTest.java @@ -5,8 +5,11 @@ import com.yahoo.vespa.config.server.application.ApplicationReindexing; import org.junit.Test; import java.time.Instant; +import java.util.List; +import java.util.Map; import static com.yahoo.vespa.config.server.maintenance.ReindexingMaintainer.withNewReady; +import static com.yahoo.vespa.config.server.maintenance.ReindexingMaintainer.withOnlyCurrentData; import static org.junit.Assert.assertEquals; /** @@ -46,4 +49,34 @@ public class ReindexingMaintainerTest { later); } + @Test + public void testGarbageRemoval() { + ApplicationReindexing reindexing = ApplicationReindexing.ready(Instant.EPOCH) + .withPending("one", "a", 10) + .withPending("two", "b", 20) + .withReady("one", "a", Instant.ofEpochMilli(3)) + .withReady("two", "b", Instant.ofEpochMilli(2)) + .withReady("two", "c", Instant.ofEpochMilli(3)); + + assertEquals(reindexing, + withOnlyCurrentData(reindexing, Map.of("one", List.of("a", "b", "c", "d"), + "two", List.of("b", "c"), + "three", List.of("a", "b")))); + + assertEquals(reindexing, + withOnlyCurrentData(reindexing, Map.of("one", List.of("a"), + "two", List.of("b", "c")))); + + assertEquals(ApplicationReindexing.ready(Instant.EPOCH) + .withPending("two", "b", 20) + .withReady("two", "b", Instant.ofEpochMilli(2)), + withOnlyCurrentData(reindexing, Map.of("two", List.of("a", "b")))); + + assertEquals(ApplicationReindexing.ready(Instant.EPOCH) + .withReady("one", Instant.EPOCH) + .withReady("two", "c", Instant.ofEpochMilli(3)), + withOnlyCurrentData(reindexing, Map.of("one", List.of("c"), + "two", List.of("c")))); + } + } |