summaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-05 13:24:40 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-05 13:24:40 +0100
commitb71490c769a7a2660622f1bc162daeecbae29d51 (patch)
tree19255c6d3cde003c74ce8e0143750aa8fb04cf16 /clustercontroller-reindexer
parent793896757cb554eb20ea32b3c24aced6515b5c14 (diff)
Clean up and add some javadoc
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java49
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java18
2 files changed, 40 insertions, 27 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 3e66eea17cc..c668f74f469 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -2,13 +2,13 @@
package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
+import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.DocumentType;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorControlHandler.CompletionCode;
-import com.yahoo.documentapi.VisitorControlSession;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
@@ -17,20 +17,23 @@ import com.yahoo.vespa.curator.Lock;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalUnit;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;
import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED;
-import static java.time.Instant.EPOCH;
-import static java.time.temporal.ChronoUnit.MICROS;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
+/**
+ * Progresses reindexing efforts by creating visitor sessions against its own content cluster,
+ * which send documents straight to storage — via indexing if the documenet type has "index" mode.
+ * The {@link #reindex} method blocks until unterrupted, or util no more reindexing is left to do.
+ *
+ * @author jonmv
+ */
public class Reindexer {
private static final Logger log = Logger.getLogger(Reindexer.class.getName());
@@ -54,25 +57,27 @@ public class Reindexer {
}
/** Starts and tracks reprocessing of ready document types until done, or interrupted. */
- public void reindex(Lock __) {
- Reindexing reindexing = database.readReindexing();
- for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
- if (ready.get(type).isAfter(clock.instant())) {
- log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " +
- "(" + ready.get(type) + " is after " + clock.instant() + ")");
+ public void reindex() throws ReindexingLockException {
+ try (Lock lock = database.lockReindexing()) {
+ Reindexing reindexing = database.readReindexing();
+ for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
+ if (ready.get(type).isAfter(clock.instant())) {
+ log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " +
+ "(" + ready.get(type) + " is after " + clock.instant() + ")");
+ }
+ else {
+ // If this is a new document type (or a new cluster), no reindexing is required.
+ Status status = reindexing.status().getOrDefault(type,
+ Status.ready(clock.instant())
+ .running()
+ .successful(clock.instant()));
+ reindexing = reindexing.with(type, progress(type, status));
+ }
+ if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again.
+ break;
}
- else {
- // If this is a new document type (or a new cluster), no reindexing is required.
- Status status = reindexing.status().getOrDefault(type,
- Status.ready(clock.instant())
- .running()
- .successful(clock.instant()));
- reindexing = reindexing.with(type, progress(type, status));
- }
- if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again.
- break;
+ database.writeReindexing(reindexing);
}
- database.writeReindexing(reindexing);
}
@SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 882f356e312..764dca9dfda 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -35,11 +35,17 @@ import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableMap;
+/**
+ * Runs in all cluster controller containers, and progresses reindexngg efforts.
+ * Work is only done by one container at a time, by requiring a shared ZooKeeper lock to be held while visiting.
+ * Whichever maintainer gets the lock holds it until all reindexing is done, or until shutdown.
+ *
+ * @author jonmv
+ */
public class ReindexingMaintainer extends AbstractComponent {
private static final Logger log = Logger.getLogger(Reindexing.class.getName());
- private final ReindexingCurator database;
private final Reindexer reindexer;
private final ScheduledExecutorService executor;
@@ -55,9 +61,11 @@ public class ReindexingMaintainer extends AbstractComponent {
ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig allClustersBucketSpacesConfig,
ReindexingConfig reindexingConfig, DocumentmanagerConfig documentmanagerConfig) {
DocumentTypeManager manager = new DocumentTypeManager(documentmanagerConfig);
- this.database = new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager);
this.reindexer = new Reindexer(parseCluster(reindexingConfig.clusterName(), clusterListConfig, allClustersBucketSpacesConfig, manager),
- parseReady(reindexingConfig, manager), database, access, clock);
+ parseReady(reindexingConfig, manager),
+ new ReindexingCurator(Curator.create(zookeepersConfig.zookeeperserverlist()), manager),
+ access,
+ clock);
this.executor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("reindexer-"));
if (reindexingConfig.enabled())
scheduleStaggered(this::maintain, executor, Duration.ofMinutes(1), clock.instant(),
@@ -65,8 +73,8 @@ public class ReindexingMaintainer extends AbstractComponent {
}
private void maintain() {
- try (Lock lock = database.lockReindexing()){
- reindexer.reindex(lock);
+ try {
+ reindexer.reindex();
}
catch (ReindexingLockException e) {
// Some other container is handling the reindexing at this moment, which is fine.