diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2024-05-16 14:52:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-16 14:52:44 +0200 |
commit | 975d689861eb5350286f294d79ecf4942f14473a (patch) | |
tree | bcd22afdcf269a223ebe9e53bb3baa1cf3145c9f | |
parent | cced1ff316a6960a0a6d091f43b0eafd3ba2a78e (diff) | |
parent | 885115846288594f2c663432da4355fe1a3043ad (diff) |
Merge pull request #31224 from vespa-engine/jonmv/modify-reindexing-speed
Allow modifying reindexing speed, and speed 0 halts reindexing
7 files changed, 111 insertions, 38 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java index 13ced149839..74e6dfacb00 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java @@ -40,7 +40,7 @@ import static java.util.logging.Level.WARNING; * * @author jonmv */ -public class Reindexer { +class Reindexer { private static final Logger log = Logger.getLogger(Reindexer.class.getName()); @@ -55,8 +55,8 @@ public class Reindexer { private final Clock clock; private final Phaser phaser = new Phaser(2); // Reindexer and visitor. - public Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database, - DocumentAccess access, Metric metric, Clock clock) { + Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database, + DocumentAccess access, Metric metric, Clock clock) { this(cluster, ready, database, @@ -91,12 +91,12 @@ public class Reindexer { } /** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */ - public void shutdown() { + void shutdown() { phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed. } /** Starts and tracks reprocessing of ready document types until done, or interrupted. */ - public void reindex() throws ReindexingLockException { + void reindex() throws ReindexingLockException { if (phaser.isTerminated()) throw new IllegalStateException("Already shut down"); @@ -114,7 +114,7 @@ public class Reindexer { if (trigger.readyAt().isAfter(clock.instant())) log.log(INFO, "Received config for reindexing which is ready in the future — will process later " + "(" + trigger.readyAt() + " is after " + clock.instant() + ")"); - else + else if (trigger.speed() > 0) progress(trigger.type(), trigger.speed(), reindexing, new AtomicReference<>(reindexing.get().status().get(trigger.type()))); if (phaser.isTerminated()) diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java index ca9f317e840..35002c700ea 100644 --- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java +++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java @@ -99,13 +99,12 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler implement } private static String toString(Reindexing.State state) { - switch (state) { - case READY: return "pending"; - case RUNNING: return "running"; - case SUCCESSFUL: return "successful"; - case FAILED: return "failed"; - default: throw new IllegalArgumentException("Unexpected state '" + state + "'"); - } + return switch (state) { + case READY -> "pending"; + case RUNNING -> "running"; + case SUCCESSFUL -> "successful"; + case FAILED -> "failed"; + }; } } diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java index 954162d4d3d..49d6a56b583 100644 --- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java +++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java @@ -169,8 +169,13 @@ class ReindexerTest { assertEquals(reindexing, database.readReindexing("cluster")); // When failure grace period is over, reindexing resumes as usual. + // However, with speed set to 0, nothing happens. clock.advance(Duration.ofMillis(1)); shutDown.set(false); + new Reindexer(cluster, List.of(new Trigger(music, Instant.ofEpochMilli(30), 0)), database, ReindexerTest::failIfCalled, metric, clock).reindex(); + assertEquals(reindexing, database.readReindexing("cluster")); + + // With speed set to a positive value again, reindexing resumes, and eventually completes successfully. new Reindexer(cluster, triggers(30), database, parameters -> { executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK")); return () -> shutDown.set(true); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java index b43c34f20c5..4b1271d30d5 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java @@ -38,12 +38,26 @@ public class ApplicationReindexing implements Reindexing { /** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */ public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt, double speed, String cause) { + if (speed <= 0) + throw new IllegalArgumentException("Initial reindexing speed must be in (0, 10], but was " + speed); + Cluster current = clusters.getOrDefault(cluster, Cluster.empty()); Cluster modified = new Cluster(current.pending, with(documentType, new Status(readyAt, speed, cause), current.ready)); return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); } + /** Returns a copy of this with updated speed for the given document type in the given cluster. */ + public ApplicationReindexing withSpeed(String cluster, String documentType, double speed) { + Cluster current = clusters.get(cluster); + if (current == null) throw new IllegalArgumentException("no existing reindexing for cluster '" + cluster + "'"); + Status status = current.ready.get(documentType); + if (status == null) throw new IllegalArgumentException("no existing reindexing for document type '" + documentType + "' in cluster '" + cluster + "'"); + Cluster modified = new Cluster(current.pending, + with(documentType, new Status(status.ready(), speed, status.cause()), current.ready)); + return new ApplicationReindexing(enabled, with(cluster, modified, clusters)); + } + /** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */ public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) { Cluster current = clusters.getOrDefault(cluster, Cluster.empty()); @@ -185,8 +199,8 @@ public class ApplicationReindexing implements Reindexing { private final String cause; Status(Instant ready, double speed, String cause) { - if (speed <= 0 || 10 < speed) - throw new IllegalArgumentException("Reindexing speed must be in (0, 10], but was " + speed); + if (speed < 0 || 10 < speed) + throw new IllegalArgumentException("Reindexing speed must be in [0, 10], but was " + speed); this.ready = ready.truncatedTo(ChronoUnit.MILLIS); this.speed = speed; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java index cf88ab817fd..13914227971 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java @@ -124,6 +124,14 @@ public class ApplicationHandler extends HttpHandler { } @Override + public HttpResponse handlePUT(HttpRequest request) { + Path path = new Path(request.getUri()); + + if (path.matches("/application/v2/tenant/{tenant}/application/{application}/environment/{ignore}/region/{ignore}/instance/{instance}/reindex")) return updateReindexing(applicationId(path), request); + return ErrorResponse.notFoundError("Nothing at " + path); + } + + @Override public HttpResponse handleDELETE(HttpRequest request) { Path path = new Path(request.getUri()); @@ -259,6 +267,33 @@ public class ApplicationHandler extends HttpHandler { } private HttpResponse triggerReindexing(ApplicationId applicationId, HttpRequest request) { + double speed = Double.parseDouble(Objects.requireNonNullElse(request.getProperty("speed"), "1")); + String cause = Objects.requireNonNullElse(request.getProperty("cause"), "reindexing for an unknown reason"); + Instant now = applicationRepository.clock().instant(); + + return modifyReindexing(applicationId, request, + (original, cluster, type) -> original.withReady(cluster, type, now, speed, cause), + new StringJoiner(", ", "Reindexing document types ", " of application " + applicationId) + .setEmptyValue("Not reindexing any document types of application " + applicationId)); + } + + private HttpResponse updateReindexing(ApplicationId applicationId, HttpRequest request) { + String speedValue = request.getProperty("speed"); + if (speedValue == null) + throw new IllegalArgumentException("request must specify 'speed' parameter"); + + return modifyReindexing(applicationId, request, + (original, cluster, type) -> original.withSpeed(cluster, type, Double.parseDouble(speedValue)), + new StringJoiner(", ", "Set reindexing speed to '" + speedValue + "' for document types ", " of application " + applicationId) + .setEmptyValue("Changed reindexing of no document types of application " + applicationId)); + } + + private interface ReindexingModification { + ApplicationReindexing apply(ApplicationReindexing original, String cluster, String type); + } + + private HttpResponse modifyReindexing(ApplicationId applicationId, HttpRequest request, + ReindexingModification modification, StringJoiner messageBuilder) { Model model = getActiveModelOrThrow(applicationId); Map<String, Set<String>> documentTypes = model.documentTypesByCluster(); Map<String, Set<String>> indexedDocumentTypes = model.indexedDocumentTypesByCluster(); @@ -266,11 +301,8 @@ public class ApplicationHandler extends HttpHandler { boolean indexedOnly = request.getBooleanProperty("indexedOnly"); Set<String> clusters = StringUtilities.split(request.getProperty("clusterId")); Set<String> types = StringUtilities.split(request.getProperty("documentType")); - double speed = Double.parseDouble(Objects.requireNonNullElse(request.getProperty("speed"), "1")); - String cause = Objects.requireNonNullElse(request.getProperty("cause"), "reindexing for an unknown reason"); Map<String, Set<String>> reindexed = new TreeMap<>(); - Instant now = applicationRepository.clock().instant(); applicationRepository.modifyReindexing(applicationId, reindexing -> { for (String cluster : clusters.isEmpty() ? documentTypes.keySet() : clusters) { if ( ! documentTypes.containsKey(cluster)) @@ -283,7 +315,7 @@ public class ApplicationHandler extends HttpHandler { String.join(", ", documentTypes.get(cluster))); if ( ! indexedOnly || indexedDocumentTypes.get(cluster).contains(type)) { - reindexing = reindexing.withReady(cluster, type, now, speed, cause); + reindexing = modification.apply(reindexing, cluster, type); reindexed.computeIfAbsent(cluster, __ -> new TreeSet<>()).add(type); } } @@ -292,13 +324,10 @@ public class ApplicationHandler extends HttpHandler { }); return new MessageResponse(reindexed.entrySet().stream() - .filter(cluster -> ! cluster.getValue().isEmpty()) - .map(cluster -> "[" + String.join(", ", cluster.getValue()) + "] in '" + cluster.getKey() + "'") - .reduce(new StringJoiner(", ", "Reindexing document types ", " of application " + applicationId) - .setEmptyValue("Not reindexing any document types of application " + applicationId), - StringJoiner::add, - StringJoiner::merge) - .toString()); + .filter(cluster -> ! cluster.getValue().isEmpty()) + .map(cluster -> "[" + String.join(", ", cluster.getValue()) + "] in '" + cluster.getKey() + "'") + .reduce(messageBuilder, StringJoiner::add, StringJoiner::merge) + .toString()); } public HttpResponse disableReindexing(ApplicationId applicationId) { diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java index 14d28363aa6..258917527d1 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java @@ -11,6 +11,7 @@ import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; /** @@ -66,6 +67,26 @@ public class ApplicationReindexingTest { "b", new Status(Instant.ofEpochMilli(2), 3, "test reindexing")), reindexing.clusters().get("two").ready()); + assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(3), 2, "test reindexing"), + "b", new Status(Instant.ofEpochMilli(2), 0, "test reindexing")), + reindexing.withSpeed("two", "b", 0).clusters().get("two").ready()); + + assertEquals("no existing reindexing for cluster 'three'", + assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("three", "a", 4)) + .getMessage()); + + assertEquals("no existing reindexing for document type 'c' in cluster 'two'", + assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("two", "c", 4)) + .getMessage()); + + assertEquals("Initial reindexing speed must be in (0, 10], but was 0.0", + assertThrows(IllegalArgumentException.class, () -> reindexing.withReady("two", "b", Instant.EPOCH, 0, "no")) + .getMessage()); + + assertEquals("Reindexing speed must be in [0, 10], but was -1.0", + assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("two", "b", -1)) + .getMessage()); + assertEquals(Map.of("b", 20L), reindexing.clusters().get("two").pending()); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index 9811f4826e5..01ddceb33a2 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -79,6 +79,7 @@ import static com.yahoo.container.jdisc.HttpRequest.createTestRequest; import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; import static com.yahoo.jdisc.http.HttpRequest.Method.GET; import static com.yahoo.jdisc.http.HttpRequest.Method.POST; +import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals; import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse; import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse; @@ -311,6 +312,11 @@ public class ApplicationHandlerTest { assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); + clock.advance(Duration.ofSeconds(1)); + reindex(applicationId, PUT, "?documentType=bar&speed=0", "{\"message\":\"Set reindexing speed to '0' for document types [bar] in 'boo', [bar] in 'foo' of application default.default\"}"); + expected = expected.withSpeed("boo", "bar", 0) + .withSpeed("foo", "bar", 0); + reindexing(applicationId, DELETE, "{\"message\":\"Reindexing disabled\"}"); expected = expected.enabled(false); assertEquals(expected, @@ -333,8 +339,8 @@ public class ApplicationHandlerTest { " }," + " \"ready\": {" + " \"bar\": {" + - " \"readyMillis\": " + (now - 1000) + ", " + - " \"speed\": 1.0," + + " \"readyMillis\": " + (now - 2000) + ", " + + " \"speed\": 0.0," + " \"cause\": \"reindexing\"," + " \"state\": \"pending\"" + " }" + @@ -344,19 +350,19 @@ public class ApplicationHandlerTest { " \"pending\": {}," + " \"ready\": {" + " \"bar\": {" + - " \"readyMillis\": " + now + ", " + - " \"speed\": 0.1," + + " \"readyMillis\": " + (now - 1000) + ", " + + " \"speed\": 0.0," + " \"cause\": \"reindexing\"," + " \"state\": \"pending\"" + " }," + " \"bax\": {" + - " \"readyMillis\": " + (now - 1000) + ", " + + " \"readyMillis\": " + (now - 2000) + ", " + " \"speed\": 1.0," + " \"cause\": \"reindexing\"," + " \"state\": \"pending\"" + " }," + " \"baz\": {" + - " \"readyMillis\": " + now + ", " + + " \"readyMillis\": " + (now - 1000) + ", " + " \"speed\": 0.1," + " \"cause\": \"reindexing\"," + " \"state\": \"pending\"" + @@ -427,11 +433,6 @@ public class ApplicationHandlerTest { } @Test - public void testPutIsIllegal() throws IOException { - assertNotAllowed(Method.PUT); - } - - @Test public void testFileDistributionStatus() throws Exception { applicationRepository.deploy(testApp, prepareParams(applicationId)); Zone zone = Zone.defaultZone(); @@ -874,8 +875,12 @@ public class ApplicationHandlerTest { } private void reindex(ApplicationId application, String query, String message) throws IOException { + reindex(application, POST, query, message); + } + + private void reindex(ApplicationId application, Method method, String query, String message) throws IOException { String reindexUrl = toUrlPath(application, Zone.defaultZone(), true) + "/reindex" + query; - assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, message); + assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, method)), 200, message); } private void restart(ApplicationId application, Zone zone) throws IOException { |