summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2024-05-16 14:52:44 +0200
committerGitHub <noreply@github.com>2024-05-16 14:52:44 +0200
commit975d689861eb5350286f294d79ecf4942f14473a (patch)
treebcd22afdcf269a223ebe9e53bb3baa1cf3145c9f
parentcced1ff316a6960a0a6d091f43b0eafd3ba2a78e (diff)
parent885115846288594f2c663432da4355fe1a3043ad (diff)
Merge pull request #31224 from vespa-engine/jonmv/modify-reindexing-speed
Allow modifying reindexing speed, and speed 0 halts reindexing
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java12
-rw-r--r--clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java13
-rw-r--r--clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java18
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java51
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java21
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java29
7 files changed, 111 insertions, 38 deletions
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
index 13ced149839..74e6dfacb00 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/Reindexer.java
@@ -40,7 +40,7 @@ import static java.util.logging.Level.WARNING;
*
* @author jonmv
*/
-public class Reindexer {
+class Reindexer {
private static final Logger log = Logger.getLogger(Reindexer.class.getName());
@@ -55,8 +55,8 @@ public class Reindexer {
private final Clock clock;
private final Phaser phaser = new Phaser(2); // Reindexer and visitor.
- public Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database,
- DocumentAccess access, Metric metric, Clock clock) {
+ Reindexer(Cluster cluster, List<Trigger> ready, ReindexingCurator database,
+ DocumentAccess access, Metric metric, Clock clock) {
this(cluster,
ready,
database,
@@ -91,12 +91,12 @@ public class Reindexer {
}
/** Lets the reindexer abort any ongoing visit session, wait for it to complete normally, then exit. */
- public void shutdown() {
+ void shutdown() {
phaser.forceTermination(); // All parties waiting on this phaser are immediately allowed to proceed.
}
/** Starts and tracks reprocessing of ready document types until done, or interrupted. */
- public void reindex() throws ReindexingLockException {
+ void reindex() throws ReindexingLockException {
if (phaser.isTerminated())
throw new IllegalStateException("Already shut down");
@@ -114,7 +114,7 @@ public class Reindexer {
if (trigger.readyAt().isAfter(clock.instant()))
log.log(INFO, "Received config for reindexing which is ready in the future — will process later " +
"(" + trigger.readyAt() + " is after " + clock.instant() + ")");
- else
+ else if (trigger.speed() > 0)
progress(trigger.type(), trigger.speed(), reindexing, new AtomicReference<>(reindexing.get().status().get(trigger.type())));
if (phaser.isTerminated())
diff --git a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
index ca9f317e840..35002c700ea 100644
--- a/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
+++ b/clustercontroller-reindexer/src/main/java/ai/vespa/reindexing/http/ReindexingV1ApiHandler.java
@@ -99,13 +99,12 @@ public class ReindexingV1ApiHandler extends ThreadedHttpRequestHandler implement
}
private static String toString(Reindexing.State state) {
- switch (state) {
- case READY: return "pending";
- case RUNNING: return "running";
- case SUCCESSFUL: return "successful";
- case FAILED: return "failed";
- default: throw new IllegalArgumentException("Unexpected state '" + state + "'");
- }
+ return switch (state) {
+ case READY -> "pending";
+ case RUNNING -> "running";
+ case SUCCESSFUL -> "successful";
+ case FAILED -> "failed";
+ };
}
}
diff --git a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
index 954162d4d3d..49d6a56b583 100644
--- a/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
+++ b/clustercontroller-reindexer/src/test/java/ai/vespa/reindexing/ReindexerTest.java
@@ -169,8 +169,13 @@ class ReindexerTest {
assertEquals(reindexing, database.readReindexing("cluster"));
// When failure grace period is over, reindexing resumes as usual.
+ // However, with speed set to 0, nothing happens.
clock.advance(Duration.ofMillis(1));
shutDown.set(false);
+ new Reindexer(cluster, List.of(new Trigger(music, Instant.ofEpochMilli(30), 0)), database, ReindexerTest::failIfCalled, metric, clock).reindex();
+ assertEquals(reindexing, database.readReindexing("cluster"));
+
+ // With speed set to a positive value again, reindexing resumes, and eventually completes successfully.
new Reindexer(cluster, triggers(30), database, parameters -> {
executor.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "OK"));
return () -> shutDown.set(true);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
index b43c34f20c5..4b1271d30d5 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationReindexing.java
@@ -38,12 +38,26 @@ public class ApplicationReindexing implements Reindexing {
/** Returns a copy of this with reindexing for the given document type in the given cluster ready at the given instant. */
public ApplicationReindexing withReady(String cluster, String documentType, Instant readyAt, double speed, String cause) {
+ if (speed <= 0)
+ throw new IllegalArgumentException("Initial reindexing speed must be in (0, 10], but was " + speed);
+
Cluster current = clusters.getOrDefault(cluster, Cluster.empty());
Cluster modified = new Cluster(current.pending,
with(documentType, new Status(readyAt, speed, cause), current.ready));
return new ApplicationReindexing(enabled, with(cluster, modified, clusters));
}
+ /** Returns a copy of this with updated speed for the given document type in the given cluster. */
+ public ApplicationReindexing withSpeed(String cluster, String documentType, double speed) {
+ Cluster current = clusters.get(cluster);
+ if (current == null) throw new IllegalArgumentException("no existing reindexing for cluster '" + cluster + "'");
+ Status status = current.ready.get(documentType);
+ if (status == null) throw new IllegalArgumentException("no existing reindexing for document type '" + documentType + "' in cluster '" + cluster + "'");
+ Cluster modified = new Cluster(current.pending,
+ with(documentType, new Status(status.ready(), speed, status.cause()), current.ready));
+ return new ApplicationReindexing(enabled, with(cluster, modified, clusters));
+ }
+
/** Returns a copy of this with a pending reindexing at the given generation, for the given document type. */
public ApplicationReindexing withPending(String cluster, String documentType, long requiredGeneration) {
Cluster current = clusters.getOrDefault(cluster, Cluster.empty());
@@ -185,8 +199,8 @@ public class ApplicationReindexing implements Reindexing {
private final String cause;
Status(Instant ready, double speed, String cause) {
- if (speed <= 0 || 10 < speed)
- throw new IllegalArgumentException("Reindexing speed must be in (0, 10], but was " + speed);
+ if (speed < 0 || 10 < speed)
+ throw new IllegalArgumentException("Reindexing speed must be in [0, 10], but was " + speed);
this.ready = ready.truncatedTo(ChronoUnit.MILLIS);
this.speed = speed;
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
index cf88ab817fd..13914227971 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
@@ -124,6 +124,14 @@ public class ApplicationHandler extends HttpHandler {
}
@Override
+ public HttpResponse handlePUT(HttpRequest request) {
+ Path path = new Path(request.getUri());
+
+ if (path.matches("/application/v2/tenant/{tenant}/application/{application}/environment/{ignore}/region/{ignore}/instance/{instance}/reindex")) return updateReindexing(applicationId(path), request);
+ return ErrorResponse.notFoundError("Nothing at " + path);
+ }
+
+ @Override
public HttpResponse handleDELETE(HttpRequest request) {
Path path = new Path(request.getUri());
@@ -259,6 +267,33 @@ public class ApplicationHandler extends HttpHandler {
}
private HttpResponse triggerReindexing(ApplicationId applicationId, HttpRequest request) {
+ double speed = Double.parseDouble(Objects.requireNonNullElse(request.getProperty("speed"), "1"));
+ String cause = Objects.requireNonNullElse(request.getProperty("cause"), "reindexing for an unknown reason");
+ Instant now = applicationRepository.clock().instant();
+
+ return modifyReindexing(applicationId, request,
+ (original, cluster, type) -> original.withReady(cluster, type, now, speed, cause),
+ new StringJoiner(", ", "Reindexing document types ", " of application " + applicationId)
+ .setEmptyValue("Not reindexing any document types of application " + applicationId));
+ }
+
+ private HttpResponse updateReindexing(ApplicationId applicationId, HttpRequest request) {
+ String speedValue = request.getProperty("speed");
+ if (speedValue == null)
+ throw new IllegalArgumentException("request must specify 'speed' parameter");
+
+ return modifyReindexing(applicationId, request,
+ (original, cluster, type) -> original.withSpeed(cluster, type, Double.parseDouble(speedValue)),
+ new StringJoiner(", ", "Set reindexing speed to '" + speedValue + "' for document types ", " of application " + applicationId)
+ .setEmptyValue("Changed reindexing of no document types of application " + applicationId));
+ }
+
+ private interface ReindexingModification {
+ ApplicationReindexing apply(ApplicationReindexing original, String cluster, String type);
+ }
+
+ private HttpResponse modifyReindexing(ApplicationId applicationId, HttpRequest request,
+ ReindexingModification modification, StringJoiner messageBuilder) {
Model model = getActiveModelOrThrow(applicationId);
Map<String, Set<String>> documentTypes = model.documentTypesByCluster();
Map<String, Set<String>> indexedDocumentTypes = model.indexedDocumentTypesByCluster();
@@ -266,11 +301,8 @@ public class ApplicationHandler extends HttpHandler {
boolean indexedOnly = request.getBooleanProperty("indexedOnly");
Set<String> clusters = StringUtilities.split(request.getProperty("clusterId"));
Set<String> types = StringUtilities.split(request.getProperty("documentType"));
- double speed = Double.parseDouble(Objects.requireNonNullElse(request.getProperty("speed"), "1"));
- String cause = Objects.requireNonNullElse(request.getProperty("cause"), "reindexing for an unknown reason");
Map<String, Set<String>> reindexed = new TreeMap<>();
- Instant now = applicationRepository.clock().instant();
applicationRepository.modifyReindexing(applicationId, reindexing -> {
for (String cluster : clusters.isEmpty() ? documentTypes.keySet() : clusters) {
if ( ! documentTypes.containsKey(cluster))
@@ -283,7 +315,7 @@ public class ApplicationHandler extends HttpHandler {
String.join(", ", documentTypes.get(cluster)));
if ( ! indexedOnly || indexedDocumentTypes.get(cluster).contains(type)) {
- reindexing = reindexing.withReady(cluster, type, now, speed, cause);
+ reindexing = modification.apply(reindexing, cluster, type);
reindexed.computeIfAbsent(cluster, __ -> new TreeSet<>()).add(type);
}
}
@@ -292,13 +324,10 @@ public class ApplicationHandler extends HttpHandler {
});
return new MessageResponse(reindexed.entrySet().stream()
- .filter(cluster -> ! cluster.getValue().isEmpty())
- .map(cluster -> "[" + String.join(", ", cluster.getValue()) + "] in '" + cluster.getKey() + "'")
- .reduce(new StringJoiner(", ", "Reindexing document types ", " of application " + applicationId)
- .setEmptyValue("Not reindexing any document types of application " + applicationId),
- StringJoiner::add,
- StringJoiner::merge)
- .toString());
+ .filter(cluster -> ! cluster.getValue().isEmpty())
+ .map(cluster -> "[" + String.join(", ", cluster.getValue()) + "] in '" + cluster.getKey() + "'")
+ .reduce(messageBuilder, StringJoiner::add, StringJoiner::merge)
+ .toString());
}
public HttpResponse disableReindexing(ApplicationId applicationId) {
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
index 14d28363aa6..258917527d1 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationReindexingTest.java
@@ -11,6 +11,7 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
/**
@@ -66,6 +67,26 @@ public class ApplicationReindexingTest {
"b", new Status(Instant.ofEpochMilli(2), 3, "test reindexing")),
reindexing.clusters().get("two").ready());
+ assertEquals(Map.of("a", new Status(Instant.ofEpochMilli(3), 2, "test reindexing"),
+ "b", new Status(Instant.ofEpochMilli(2), 0, "test reindexing")),
+ reindexing.withSpeed("two", "b", 0).clusters().get("two").ready());
+
+ assertEquals("no existing reindexing for cluster 'three'",
+ assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("three", "a", 4))
+ .getMessage());
+
+ assertEquals("no existing reindexing for document type 'c' in cluster 'two'",
+ assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("two", "c", 4))
+ .getMessage());
+
+ assertEquals("Initial reindexing speed must be in (0, 10], but was 0.0",
+ assertThrows(IllegalArgumentException.class, () -> reindexing.withReady("two", "b", Instant.EPOCH, 0, "no"))
+ .getMessage());
+
+ assertEquals("Reindexing speed must be in [0, 10], but was -1.0",
+ assertThrows(IllegalArgumentException.class, () -> reindexing.withSpeed("two", "b", -1))
+ .getMessage());
+
assertEquals(Map.of("b", 20L),
reindexing.clusters().get("two").pending());
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
index 9811f4826e5..01ddceb33a2 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
@@ -79,6 +79,7 @@ import static com.yahoo.container.jdisc.HttpRequest.createTestRequest;
import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
import static com.yahoo.test.json.JsonTestHelper.assertJsonEquals;
import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceListResponse;
import static com.yahoo.vespa.config.server.application.ConfigConvergenceChecker.ServiceResponse;
@@ -311,6 +312,11 @@ public class ApplicationHandlerTest {
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
+ clock.advance(Duration.ofSeconds(1));
+ reindex(applicationId, PUT, "?documentType=bar&speed=0", "{\"message\":\"Set reindexing speed to '0' for document types [bar] in 'boo', [bar] in 'foo' of application default.default\"}");
+ expected = expected.withSpeed("boo", "bar", 0)
+ .withSpeed("foo", "bar", 0);
+
reindexing(applicationId, DELETE, "{\"message\":\"Reindexing disabled\"}");
expected = expected.enabled(false);
assertEquals(expected,
@@ -333,8 +339,8 @@ public class ApplicationHandlerTest {
" }," +
" \"ready\": {" +
" \"bar\": {" +
- " \"readyMillis\": " + (now - 1000) + ", " +
- " \"speed\": 1.0," +
+ " \"readyMillis\": " + (now - 2000) + ", " +
+ " \"speed\": 0.0," +
" \"cause\": \"reindexing\"," +
" \"state\": \"pending\"" +
" }" +
@@ -344,19 +350,19 @@ public class ApplicationHandlerTest {
" \"pending\": {}," +
" \"ready\": {" +
" \"bar\": {" +
- " \"readyMillis\": " + now + ", " +
- " \"speed\": 0.1," +
+ " \"readyMillis\": " + (now - 1000) + ", " +
+ " \"speed\": 0.0," +
" \"cause\": \"reindexing\"," +
" \"state\": \"pending\"" +
" }," +
" \"bax\": {" +
- " \"readyMillis\": " + (now - 1000) + ", " +
+ " \"readyMillis\": " + (now - 2000) + ", " +
" \"speed\": 1.0," +
" \"cause\": \"reindexing\"," +
" \"state\": \"pending\"" +
" }," +
" \"baz\": {" +
- " \"readyMillis\": " + now + ", " +
+ " \"readyMillis\": " + (now - 1000) + ", " +
" \"speed\": 0.1," +
" \"cause\": \"reindexing\"," +
" \"state\": \"pending\"" +
@@ -427,11 +433,6 @@ public class ApplicationHandlerTest {
}
@Test
- public void testPutIsIllegal() throws IOException {
- assertNotAllowed(Method.PUT);
- }
-
- @Test
public void testFileDistributionStatus() throws Exception {
applicationRepository.deploy(testApp, prepareParams(applicationId));
Zone zone = Zone.defaultZone();
@@ -874,8 +875,12 @@ public class ApplicationHandlerTest {
}
private void reindex(ApplicationId application, String query, String message) throws IOException {
+ reindex(application, POST, query, message);
+ }
+
+ private void reindex(ApplicationId application, Method method, String query, String message) throws IOException {
String reindexUrl = toUrlPath(application, Zone.defaultZone(), true) + "/reindex" + query;
- assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, message);
+ assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, method)), 200, message);
}
private void restart(ApplicationId application, Zone zone) throws IOException {