diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 13:24:40 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-05 13:24:40 +0100 |
commit | b71490c769a7a2660622f1bc162daeecbae29d51 (patch) | |
tree | 19255c6d3cde003c74ce8e0143750aa8fb04cf16 /clustercontroller-reindexer | |
parent | 793896757cb554eb20ea32b3c24aced6515b5c14 (diff) |
Clean up and add some javadoc
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java | 49 | ||||
-rw-r--r-- | clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java | 18 |
2 files changed, 40 insertions, 27 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 3e66eea17cc..c668f74f469 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -2,13 +2,13 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; +import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.DocumentType; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorControlHandler.CompletionCode; -import com.yahoo.documentapi.VisitorControlSession; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; @@ -17,20 +17,23 @@ import com.yahoo.vespa.curator.Lock; import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.time.temporal.TemporalUnit; import java.util.Map; import java.util.TreeMap; import java.util.logging.Logger; import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED; -import static java.time.Instant.EPOCH; -import static java.time.temporal.ChronoUnit.MICROS; import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; +/** + * Progresses reindexing efforts by creating visitor sessions against its own content cluster, + * which send documents straight to storage — via indexing if the documenet type has "index" mode. + * The {@link #reindex} method blocks until unterrupted, or util no more reindexing is left to do. + * + * @author jonmv + */ public class Reindexer { private static final Logger log = Logger.getLogger(Reindexer.class.getName()); @@ -54,25 +57,27 @@ public class Reindexer { } /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ - public void reindex(Lock __) { - Reindexing reindexing = database.readReindexing(); - for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. - if (ready.get(type).isAfter(clock.instant())) { - log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " + - "(" + ready.get(type) + " is after " + clock.instant() + ")"); + public void reindex() throws ReindexingLockException { + try (Lock lock = database.lockReindexing()) { + Reindexing reindexing = database.readReindexing(); + for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. + if (ready.get(type).isAfter(clock.instant())) { + log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " + + "(" + ready.get(type) + " is after " + clock.instant() + ")"); + } + else { + // If this is a new document type (or a new cluster), no reindexing is required. + Status status = reindexing.status().getOrDefault(type, + Status.ready(clock.instant()) + .running() + .successful(clock.instant())); + reindexing = reindexing.with(type, progress(type, status)); + } + if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again. + break; } - else { - // If this is a new document type (or a new cluster), no reindexing is required. - Status status = reindexing.status().getOrDefault(type, - Status.ready(clock.instant()) - .running() - .successful(clock.instant())); - reindexing = reindexing.with(type, progress(type, status)); - } - if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again. - break; + database.writeReindexing(reindexing); } - database.writeReindexing(reindexing); } @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 882f356e312..764dca9dfda 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -35,11 +35,17 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; +/** + * Runs in all cluster controller containers, and progresses reindexngg efforts. + * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting. + * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown. + * + * @author jonmv + */ public class ReindexingMaintainer extends AbstractComponent { private static final Logger log = Logger.getLogger(Reindexing.class.getName()); - private final ReindexingCurator database; private final Reindexer reindexer; private final ScheduledExecutorService executor; @@ -55,9 +61,11 @@ public class ReindexingMaintainer extends AbstractComponent { ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig, ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) { DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig); - this.database = new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager); this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager), - parseReady(reindexingConfig, manager), database, access, clock); + parseReady(reindexingConfig, manager), + new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager), + access, + clock); this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-")); if (reindexingConfig.enabled()) scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(), @@ -65,8 +73,8 @@ public class ReindexingMaintainer extends AbstractComponent { } private void maintain() { - try (Lock lock = database.lockReindexing()){ - reindexer.reindex(lock); + try { + reindexer.reindex(); } catch (ReindexingLockException e) { // Some other container is handling the reindexing at this moment, which is fine. |