aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-reindexer
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-11-06 16:55:46 +0100
committerJon Marius Venstad <venstad@gmail.com>2020-11-06 16:55:46 +0100
commit604ddaefbb59f1353a16e25e45ad0c241cc79793 (patch)
tree97ac3f57e69447872436944a78911b7b49b6a3dd /clustercontroller-reindexer
parentcab5c5ae03d0b6bb76f0fbda57e99f6302349bab (diff)
Avoid interrupts for control flow
Diffstat (limited to 'clustercontroller-reindexer')
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java103
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java20
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java30
3 files changed, 84 insertions, 69 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 6f468f5b103..22b27056123 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -4,12 +4,10 @@ package ai.vespa.reindexing;
import ai.vespa.reindexing.Reindexing.Status;
import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException;
import com.yahoo.document.DocumentType;
-import com.yahoo.document.Field;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
-import com.yahoo.documentapi.VisitorControlHandler.CompletionCode;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
@@ -21,10 +19,11 @@ import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED;
import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
@@ -47,11 +46,15 @@ public class Reindexer {
private final ReindexingCurator database;
private final DocumentAccess access;
private final Clock clock;
+ private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
+
+ private Reindexing reindexing;
+ private Status status;
public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database,
DocumentAccess access, Clock clock) {
for (DocumentType type : ready.keySet())
- cluster.bucketOf(type); // Verifies this is known.
+ cluster.bucketSpaceOf(type); // Verifies this is known.
this.cluster = cluster;
this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order.
@@ -60,72 +63,92 @@ public class Reindexer {
this.clock = clock;
}
+ /** Tells this to stop reindexing at its leisure. */
+ public void shutdown() {
+ phaser.forceTermination();
+ }
+
/** Starts and tracks reprocessing of ready document types until done, or interrupted. */
public void reindex() throws ReindexingLockException {
+ if (phaser.isTerminated())
+ throw new IllegalStateException("Already shut down");
+
try (Lock lock = database.lockReindexing()) {
- Reindexing reindexing = database.readReindexing();
for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config.
- if (ready.get(type).isAfter(clock.instant())) {
+ if (ready.get(type).isAfter(clock.instant()))
log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " +
"(" + ready.get(type) + " is after " + clock.instant() + ")");
- }
- else {
- // If this is a new document type (or a new cluster), no reindexing is required.
- Status status = reindexing.status().getOrDefault(type,
- Status.ready(clock.instant())
- .running()
- .successful(clock.instant()));
- reindexing = reindexing.with(type, progress(type, status));
- }
- if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again.
+ else
+ progress(type);
+
+ if (phaser.isTerminated())
break;
}
- database.writeReindexing(reindexing);
}
}
@SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\
- private Status progress(DocumentType type, Status status) {
+ private void progress(DocumentType type) {
+ // If this is a new document type (or a new cluster), no reindexing is required.
+ reindexing = database.readReindexing();
+ status = reindexing.status().getOrDefault(type,
+ Status.ready(clock.instant())
+ .running()
+ .successful(clock.instant()));
if (ready.get(type).isAfter(status.startedAt()))
status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required.
+ database.writeReindexing(reindexing = reindexing.with(type, status));
+
switch (status.state()) {
default:
log.log(WARNING, "Unknown reindexing state '" + status.state() + "'");
case FAILED:
log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure");
case SUCCESSFUL: // Intentional fallthrough — all three are done states.
- return status;
+ return;
case RUNNING:
log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type);
case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously.
- log.log(FINE, () -> "Running reindexing of " + type + ", which started at " + status.startedAt());
+ log.log(FINE, () -> "Running reindexing of " + type);
}
// Visit buckets until they're all done, or until we are interrupted.
status = status.running();
- VisitorControlHandler control = new VisitorControlHandler();
+ AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant());
+ VisitorControlHandler control = new VisitorControlHandler() {
+ @Override
+ public void onProgress(ProgressToken token) {
+ super.onProgress(token);
+ status = status.progressed(token);
+ if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10)))
+ database.writeReindexing(reindexing = reindexing.with(type, status));
+ }
+ @Override
+ public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ phaser.arriveAndAwaitAdvance();
+ }
+ };
visit(type, status.progress().orElse(null), control);
- // Progress is null if no buckets were successfully visited due to interrupt.
- if (control.getProgress() != null)
- status = status.progressed(control.getProgress());
-
// If we were interrupted, the result may not yet be set in the control handler.
- CompletionCode code = control.getResult() != null ? control.getResult().getCode() : ABORTED;
- switch (code) {
+ switch (control.getResult().getCode()) {
default:
log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'");
case FAILURE: // Intentional fallthrough — this is an error.
log.log(WARNING, "Visiting failed: " + control.getResult().getMessage());
- return status.failed(clock.instant(), control.getResult().getMessage());
+ status = status.failed(clock.instant(), control.getResult().getMessage());
+ break;
case ABORTED:
log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later");
- return status.halted();
+ status = status.halted();
+ break;
case SUCCESS:
log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant()));
- return status.successful(clock.instant());
+ status = status.successful(clock.instant());
}
+ database.writeReindexing(reindexing.with(type, status));
}
private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) {
@@ -139,15 +162,9 @@ public class Reindexer {
throw new IllegalStateException(e);
}
- // Wait until done, or interrupted, in which case we abort the visit but don't wait for it to complete.
- try {
- control.waitUntilDone();
- }
- catch (InterruptedException e) {
- control.abort();
- Thread.currentThread().interrupt();
- }
- session.destroy(); // If thread is interrupted, this will not wait, but will retain the interrupted flag.
+ // Wait until done, or shut down, in which case we abort the visit and wait for it to complete.
+ phaser.arriveAndAwaitAdvance();
+ session.destroy();
}
VisitorParameters createParameters(DocumentType type, ProgressToken progress) {
@@ -157,7 +174,7 @@ public class Reindexer {
parameters.setFieldSet(type.getName() + ":[document]");
parameters.setPriority(DocumentProtocol.Priority.LOW_1);
parameters.setRoute(cluster.route());
- parameters.setBucketSpace(cluster.bucketOf(type));
+ parameters.setBucketSpace(cluster.bucketSpaceOf(type));
// parameters.setVisitorLibrary("ReindexVisitor");
return parameters;
}
@@ -183,8 +200,8 @@ public class Reindexer {
return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]";
}
- String bucketOf(DocumentType documentType) {
- return requireNonNull(documentBuckets.get(documentType), "Unknown bucket for " + documentType);
+ String bucketSpaceOf(DocumentType documentType) {
+ return requireNonNull(documentBuckets.get(documentType), "Unknown bucket space for " + documentType);
}
@Override
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
index 7ca98f9a107..054a91458d7 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java
@@ -31,6 +31,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;
+import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toUnmodifiableMap;
@@ -80,19 +81,20 @@ public class ReindexingMaintainer extends AbstractComponent {
// Some other container is handling the reindexing at this moment, which is fine.
}
catch (Exception e) {
- log.log(Level.WARNING, "Exception when reindexing", e);
+ log.log(WARNING, "Exception when reindexing", e);
}
}
@Override
public void deconstruct() {
try {
- executor.shutdownNow();
- if ( ! executor.awaitTermination(20, TimeUnit.SECONDS))
- log.log(Level.SEVERE, "Failed to shut down reindexer within timeout");
+ reindexer.shutdown();
+ executor.shutdown();
+ if ( ! executor.awaitTermination(45, TimeUnit.SECONDS))
+ log.log(WARNING, "Failed to shut down reindexer within timeout");
}
catch (InterruptedException e) {
- log.log(Level.SEVERE, "Interrupted while waiting for reindexer to shut down");
+ log.log(WARNING, "Interrupted while waiting for reindexer to shut down");
Thread.currentThread().interrupt();
}
@@ -121,15 +123,15 @@ public class ReindexingMaintainer extends AbstractComponent {
scheduler.accept(delayMillis, intervalMillis);
}
- static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets,
+ static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces,
DocumentTypeManager manager) {
return clusters.storage().stream()
.filter(storage -> storage.name().equals(name))
.map(storage -> new Cluster(name,
storage.configid(),
- buckets.cluster(name)
- .documentType().entrySet().stream()
- .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
+ bucketSpaces.cluster(name)
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> manager.getDocumentType(entry.getKey()),
entry -> entry.getValue().bucketSpace()))))
.findAny()
.orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters"));
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 263c32739a7..418b868b9c0 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -44,7 +44,8 @@ class ReindexerTest {
final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(musicConfig);
final DocumentType music = manager.getDocumentType("music");
- final Document document = new Document(music, "id:ns:music::one");
+ final Document document1 = new Document(music, "id:ns:music::one");
+ final Document document2 = new Document(music, "id:ns:music::two");
final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default"));
final ManualClock clock = new ManualClock(Instant.EPOCH);
@@ -78,7 +79,7 @@ class ReindexerTest {
@Timeout(10)
void nothingToDo() throws ReindexingLockException {
Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock);
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document));
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
access.setPhaser(new Phaser(1)); // Would block any visiting until timeout.
reindexer.reindex();
}
@@ -124,7 +125,7 @@ class ReindexerTest {
reindexer.reindex(); // Nothing happens because status is already FAILED for the document type.
assertEquals(failed, database.readReindexing());
- // It's time to reindex the "music" documents — none yet, so this is a no-op.
+ // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp.
database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH.
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(uncheckedReindex(reindexer));
@@ -135,16 +136,17 @@ class ReindexerTest {
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());
- // We add a document and interrupt visiting before this document is visited.
- access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document));
+ // We add a document and interrupt reindexing before the visit is complete.
+ access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1));
clock.advance(Duration.ofMillis(10));
reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
while (phaser.getRegisteredParties() == 1)
Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown();
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- executor.shutdownNow(); // Interrupt the visit before it completes.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
future.get(); // Write state to database.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted());
assertEquals(reindexing, database.readReindexing());
@@ -152,29 +154,23 @@ class ReindexerTest {
// Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted());
database.writeReindexing(reindexing);
- phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor = Executors.newSingleThreadExecutor();
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
while (phaser.getRegisteredParties() == 1)
Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
+ reindexer.shutdown(); // Interrupt the visit before it completes.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
- executor.shutdownNow(); // Interrupt the visit before it completes.
+ phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
future.get(); // Write state to database.
assertEquals(reindexing, database.readReindexing());
- // Finally let the visit complete.
- phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor = Executors.newSingleThreadExecutor();
+ // Finally let the visit complete normally.
+ reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock);
future = executor.submit(uncheckedReindex(reindexer));
- while (phaser.getRegisteredParties() == 1)
- Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_<
database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document.
phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete.
- phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which has now completed.
future.get(); // Write state to database.
reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant()));
assertEquals(reindexing, database.readReindexing());