diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 16:55:46 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-06 16:55:46 +0100 |
commit | 604ddaefbb59f1353a16e25e45ad0c241cc79793 (patch) | |
tree | 97ac3f57e69447872436944a78911b7b49b6a3dd | |
parent | cab5c5ae03d0b6bb76f0fbda57e99f6302349bab (diff) |
Avoid interrupts for control flow
6 files changed, 92 insertions, 72 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 6f468f5b103..22b27056123 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -4,12 +4,10 @@ package ai.vespa.reindexing; import ai.vespa.reindexing.Reindexing.Status; import ai.vespa.reindexing.ReindexingCurator.ReindexingLockException; import com.yahoo.document.DocumentType; -import com.yahoo.document.Field; import com.yahoo.document.select.parser.ParseException; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.VisitorControlHandler; -import com.yahoo.documentapi.VisitorControlHandler.CompletionCode; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; @@ -21,10 +19,11 @@ import java.time.Instant; import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; -import java.util.stream.Collectors; -import static com.yahoo.documentapi.VisitorControlHandler.CompletionCode.ABORTED; import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; @@ -47,11 +46,15 @@ public class Reindexer { private final ReindexingCurator database; private final DocumentAccess access; private final Clock clock; + private final Phaser phaser = new Phaser(2); // Reindexer and visitor. + + private Reindexing reindexing; + private Status status; public Reindexer(Cluster cluster, Map<DocumentType, Instant> ready, ReindexingCurator database, DocumentAccess access, Clock clock) { for (DocumentType type : ready.keySet()) - cluster.bucketOf(type); // Verifies this is known. + cluster.bucketSpaceOf(type); // Verifies this is known. this.cluster = cluster; this.ready = new TreeMap<>(ready); // Iterate through document types in consistent order. @@ -60,72 +63,92 @@ public class Reindexer { this.clock = clock; } + /** Tells this to stop reindexing at its leisure. */ + public void shutdown() { + phaser.forceTermination(); + } + /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ public void reindex() throws ReindexingLockException { + if (phaser.isTerminated()) + throw new IllegalStateException("Already shut down"); + try (Lock lock = database.lockReindexing()) { - Reindexing reindexing = database.readReindexing(); for (DocumentType type : ready.keySet()) { // We consider only document types for which we have config. - if (ready.get(type).isAfter(clock.instant())) { + if (ready.get(type).isAfter(clock.instant())) log.log(WARNING, "Received config for reindexing which is ready in the future — will process later " + "(" + ready.get(type) + " is after " + clock.instant() + ")"); - } - else { - // If this is a new document type (or a new cluster), no reindexing is required. - Status status = reindexing.status().getOrDefault(type, - Status.ready(clock.instant()) - .running() - .successful(clock.instant())); - reindexing = reindexing.with(type, progress(type, status)); - } - if (Thread.interrupted()) // Clear interruption status so blocking calls function normally again. + else + progress(type); + + if (phaser.isTerminated()) break; } - database.writeReindexing(reindexing); } } @SuppressWarnings("fallthrough") // (ノಠ ∩ಠ)ノ彡( \o°o)\ - private Status progress(DocumentType type, Status status) { + private void progress(DocumentType type) { + // If this is a new document type (or a new cluster), no reindexing is required. + reindexing = database.readReindexing(); + status = reindexing.status().getOrDefault(type, + Status.ready(clock.instant()) + .running() + .successful(clock.instant())); if (ready.get(type).isAfter(status.startedAt())) status = Status.ready(clock.instant()); // Need to restart, as a newer reindexing is required. + database.writeReindexing(reindexing = reindexing.with(type, status)); + switch (status.state()) { default: log.log(WARNING, "Unknown reindexing state '" + status.state() + "'"); case FAILED: log.log(FINE, () -> "Not continuing reindexing of " + type + " due to previous failure"); case SUCCESSFUL: // Intentional fallthrough — all three are done states. - return status; + return; case RUNNING: log.log(WARNING, "Unepxected state 'RUNNING' of reindexing of " + type); case READY: // Intentional fallthrough — must just assume we failed updating state when exiting previously. - log.log(FINE, () -> "Running reindexing of " + type + ", which started at " + status.startedAt()); + log.log(FINE, () -> "Running reindexing of " + type); } // Visit buckets until they're all done, or until we are interrupted. status = status.running(); - VisitorControlHandler control = new VisitorControlHandler(); + AtomicReference<Instant> progressLastStored = new AtomicReference<>(clock.instant()); + VisitorControlHandler control = new VisitorControlHandler() { + @Override + public void onProgress(ProgressToken token) { + super.onProgress(token); + status = status.progressed(token); + if (progressLastStored.get().isBefore(clock.instant().minusSeconds(10))) + database.writeReindexing(reindexing = reindexing.with(type, status)); + } + @Override + public void onDone(CompletionCode code, String message) { + super.onDone(code, message); + phaser.arriveAndAwaitAdvance(); + } + }; visit(type, status.progress().orElse(null), control); - // Progress is null if no buckets were successfully visited due to interrupt. - if (control.getProgress() != null) - status = status.progressed(control.getProgress()); - // If we were interrupted, the result may not yet be set in the control handler. - CompletionCode code = control.getResult() != null ? control.getResult().getCode() : ABORTED; - switch (code) { + switch (control.getResult().getCode()) { default: log.log(WARNING, "Unexpected visitor result '" + control.getResult().getCode() + "'"); case FAILURE: // Intentional fallthrough — this is an error. log.log(WARNING, "Visiting failed: " + control.getResult().getMessage()); - return status.failed(clock.instant(), control.getResult().getMessage()); + status = status.failed(clock.instant(), control.getResult().getMessage()); + break; case ABORTED: log.log(FINE, () -> "Halting reindexing of " + type + " due to shutdown — will continue later"); - return status.halted(); + status = status.halted(); + break; case SUCCESS: log.log(INFO, "Completed reindexing of " + type + " after " + Duration.between(status.startedAt(), clock.instant())); - return status.successful(clock.instant()); + status = status.successful(clock.instant()); } + database.writeReindexing(reindexing.with(type, status)); } private void visit(DocumentType type, ProgressToken progress, VisitorControlHandler control) { @@ -139,15 +162,9 @@ public class Reindexer { throw new IllegalStateException(e); } - // Wait until done, or interrupted, in which case we abort the visit but don't wait for it to complete. - try { - control.waitUntilDone(); - } - catch (InterruptedException e) { - control.abort(); - Thread.currentThread().interrupt(); - } - session.destroy(); // If thread is interrupted, this will not wait, but will retain the interrupted flag. + // Wait until done, or shut down, in which case we abort the visit and wait for it to complete. + phaser.arriveAndAwaitAdvance(); + session.destroy(); } VisitorParameters createParameters(DocumentType type, ProgressToken progress) { @@ -157,7 +174,7 @@ public class Reindexer { parameters.setFieldSet(type.getName() + ":[document]"); parameters.setPriority(DocumentProtocol.Priority.LOW_1); parameters.setRoute(cluster.route()); - parameters.setBucketSpace(cluster.bucketOf(type)); + parameters.setBucketSpace(cluster.bucketSpaceOf(type)); // parameters.setVisitorLibrary("ReindexVisitor"); return parameters; } @@ -183,8 +200,8 @@ public class Reindexer { return "[Storage:cluster=" + name + ";clusterconfigid=" + configId + "]"; } - String bucketOf(DocumentType documentType) { - return requireNonNull(documentBuckets.get(documentType), "Unknown bucket for " + documentType); + String bucketSpaceOf(DocumentType documentType) { + return requireNonNull(documentBuckets.get(documentType), "Unknown bucket space for " + documentType); } @Override diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java index 7ca98f9a107..054a91458d7 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/ReindexingMaintainer.java @@ -31,6 +31,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; +import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toUnmodifiableMap; @@ -80,19 +81,20 @@ public class ReindexingMaintainer extends AbstractComponent { // Some other container is handling the reindexing at this moment, which is fine. } catch (Exception e) { - log.log(Level.WARNING, "Exception when reindexing", e); + log.log(WARNING, "Exception when reindexing", e); } } @Override public void deconstruct() { try { - executor.shutdownNow(); - if ( ! executor.awaitTermination(20, TimeUnit.SECONDS)) - log.log(Level.SEVERE, "Failed to shut down reindexer within timeout"); + reindexer.shutdown(); + executor.shutdown(); + if ( ! executor.awaitTermination(45, TimeUnit.SECONDS)) + log.log(WARNING, "Failed to shut down reindexer within timeout"); } catch (InterruptedException e) { - log.log(Level.SEVERE, "Interrupted while waiting for reindexer to shut down"); + log.log(WARNING, "Interrupted while waiting for reindexer to shut down"); Thread.currentThread().interrupt(); } @@ -121,15 +123,15 @@ public class ReindexingMaintainer extends AbstractComponent { scheduler.accept(delayMillis, intervalMillis); } - static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets, + static Cluster parseCluster(String name, ClusterListConfig clusters, AllClustersBucketSpacesConfig bucketSpaces, DocumentTypeManager manager) { return clusters.storage().stream() .filter(storage -> storage.name().equals(name)) .map(storage -> new Cluster(name, storage.configid(), - buckets.cluster(name) - .documentType().entrySet().stream() - .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), + bucketSpaces.cluster(name) + .documentType().entrySet().stream() + .collect(toMap(entry -> manager.getDocumentType(entry.getKey()), entry -> entry.getValue().bucketSpace())))) .findAny() .orElseThrow(() -> new IllegalStateException("This cluster (" + name + ") not among the list of clusters")); diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 263c32739a7..418b868b9c0 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -44,7 +44,8 @@ class ReindexerTest { final DocumentmanagerConfig musicConfig = Deriver.getDocumentManagerConfig("src/test/resources/schemas/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(musicConfig); final DocumentType music = manager.getDocumentType("music"); - final Document document = new Document(music, "id:ns:music::one"); + final Document document1 = new Document(music, "id:ns:music::one"); + final Document document2 = new Document(music, "id:ns:music::two"); final Cluster cluster = new Cluster("cluster", "id", Map.of(music, "default")); final ManualClock clock = new ManualClock(Instant.EPOCH); @@ -78,7 +79,7 @@ class ReindexerTest { @Timeout(10) void nothingToDo() throws ReindexingLockException { Reindexer reindexer = new Reindexer(cluster, Map.of(), database, access, clock); - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document)); + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); access.setPhaser(new Phaser(1)); // Would block any visiting until timeout. reindexer.reindex(); } @@ -124,7 +125,7 @@ class ReindexerTest { reindexer.reindex(); // Nothing happens because status is already FAILED for the document type. assertEquals(failed, database.readReindexing()); - // It's time to reindex the "music" documents — none yet, so this is a no-op. + // It's time to reindex the "music" documents — none yet, so this is a no-op, which just updates the timestamp. database.writeReindexing(reindexing); // Restore state where reindexing was complete at 5 ms after EPOCH. ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(uncheckedReindex(reindexer)); @@ -135,16 +136,17 @@ class ReindexerTest { reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); - // We add a document and interrupt visiting before this document is visited. - access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document)); + // We add a document and interrupt reindexing before the visit is complete. + access.createSyncSession(new SyncParameters.Builder().build()).put(new DocumentPut(document1)); clock.advance(Duration.ofMillis(10)); reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); while (phaser.getRegisteredParties() == 1) Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - executor.shutdownNow(); // Interrupt the visit before it completes. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. future.get(); // Write state to database. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().halted()); assertEquals(reindexing, database.readReindexing()); @@ -152,29 +154,23 @@ class ReindexerTest { // Manually put a progress token in there, to verify this is used when resuming, and that we store what we retrieve. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().progressed(new ProgressToken()).halted()); database.writeReindexing(reindexing); - phaser = new Phaser(1); - access.setPhaser(phaser); - executor = Executors.newSingleThreadExecutor(); + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); while (phaser.getRegisteredParties() == 1) Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. + reindexer.shutdown(); // Interrupt the visit before it completes. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. - executor.shutdownNow(); // Interrupt the visit before it completes. + phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. future.get(); // Write state to database. assertEquals(reindexing, database.readReindexing()); - // Finally let the visit complete. - phaser = new Phaser(1); - access.setPhaser(phaser); - executor = Executors.newSingleThreadExecutor(); + // Finally let the visit complete normally. + reindexer = new Reindexer(cluster, Map.of(music, Instant.ofEpochMilli(20)), database, access, clock); future = executor.submit(uncheckedReindex(reindexer)); - while (phaser.getRegisteredParties() == 1) - Thread.sleep(1); // Need to wait for the visitor to register, without any proper way of doing it >_< database.writeReindexing(Reindexing.empty()); // Wreck database while running, to verify we write the expected value. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now send the document. phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which may now complete. - phaser.arriveAndAwaitAdvance(); // Synchronize with visitor, which has now completed. future.get(); // Write state to database. reindexing = reindexing.with(music, Status.ready(clock.instant()).running().successful(clock.instant())); assertEquals(reindexing, database.readReindexing()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java index 0dd96275f9d..e98be6871b4 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorIterator.java @@ -26,8 +26,8 @@ import java.util.logging.Logger; */ public class VisitorIterator { - private ProgressToken progressToken; - private BucketSource bucketSource; + private final ProgressToken progressToken; + private final BucketSource bucketSource; private int distributionBitCount; private static final Logger log = Logger.getLogger(VisitorIterator.class.getName()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index eba884c5ee8..43bfb9ca667 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -186,6 +186,12 @@ public class LocalVisitorSession implements VisitorSession { @Override public void destroy() { abort(); + try { + control.waitUntilDone(0); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index cf9f78e3679..257d491ea93 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -1167,7 +1167,6 @@ public class MessageBusVisitorSession implements VisitorSession { } } catch (InterruptedException e) { log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed"); - Thread.currentThread().interrupt(); } finally { try { sender.destroy(); |