diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2024-05-29 14:35:48 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-29 14:35:48 +0200 |
commit | 7b965b2e11b1d53aac891dfd194232970d89adcf (patch) | |
tree | 38225772e2b6e93fc304afea222a6d1260d5d40a | |
parent | 8276854275b3daaf79d6774062bce6279981c4ba (diff) | |
parent | 5ea053a236acad993ef6a731efa129f257affb3c (diff) |
Merge pull request #31323 from vespa-engine/jonmv/feed-client-timeout-fix
Update HTTP request timeout parameter as time passes
9 files changed, 63 insertions, 42 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/OnnxMemoryStats.java b/config-model-api/src/main/java/com/yahoo/config/model/api/OnnxMemoryStats.java index c45d69f02cb..23cf6846e26 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/OnnxMemoryStats.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/OnnxMemoryStats.java @@ -35,7 +35,7 @@ public record OnnxMemoryStats(long vmSize, long vmRss, long mallocPeak, long mal } public static Path memoryStatsFilePath(Path modelPath) { - var fileName = modelPath.getRelative().replaceAll("[^\\w\\d\\$@_]", "_") + ".memory_stats"; + var fileName = modelPath.getRelative().replaceAll("[^\\w$@_]", "_") + ".memory_stats"; return ApplicationPackage.MODELS_GENERATED_REPLICATED_DIR.append(fileName); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index f9f477c9693..150375dd1c0 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -194,7 +194,7 @@ public class TenantRepository { this.configserverConfig = configserverConfig; this.curator = curator; this.metrics = metrics; - metricUpdater = metrics.getOrCreateMetricUpdater(Map.of()); + this.metricUpdater = metrics.getOrCreateMetricUpdater(Map.of()); this.zkCacheExecutor = zkCacheExecutor; this.zkApplicationWatcherExecutor = zkApplicationWatcherExecutor; this.zkSessionWatcherExecutor = zkSessionWatcherExecutor; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 424481b6ef2..d5eab8e17af 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -58,6 +58,7 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; + LongSupplier nanoClock = System::nanoTime; public FeedClientBuilderImpl() { } @@ -251,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } + FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) { + this.nanoClock = requireNonNull(nanoClock); + return this; + } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index d12d72f7a70..8ee281fb38d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -55,6 +55,7 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); private final boolean speedTest; + private final LongSupplier nanoClock; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { this(builder, @@ -69,6 +70,7 @@ class HttpFeedClient implements FeedClient { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; + this.nanoClock = builder.nanoClock; verifyConnection(builder, clusterFactory); } @@ -111,11 +113,12 @@ class HttpFeedClient implements FeedClient { throw new IllegalStateException("Client is closed"); HttpRequest request = new HttpRequest(method, - getPath(documentId) + getQuery(params, speedTest), + getPath(documentId), + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? params.timeout().orElse(maxTimeout), - System::nanoTime); + nanoClock); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -137,11 +140,12 @@ class HttpFeedClient implements FeedClient { Instant start = Instant.now(); try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", - getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), + getPath(DocumentId.of("feeder", "handshake", "dummy")), + getQuery(empty(), true), requestHeaders, null, Duration.ofSeconds(15), - System::nanoTime); + nanoClock); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); @@ -312,7 +316,6 @@ class HttpFeedClient implements FeedClient { StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); if (params.createIfNonExistent()) query.add("create=true"); params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); - params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); params.route().ifPresent(route -> query.add("route=" + encode(route))); params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); if (speedTest) query.add("dryRun=true"); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index fdd35b74c35..22f6eaa75a4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -10,15 +10,17 @@ class HttpRequest { private final String method; private final String path; + private final String query; private final Map<String, Supplier<String>> headers; private final byte[] body; private final Duration timeout; private final long deadlineNanos; private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { + public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { this.method = method; this.path = path; + this.query = query; this.headers = headers; this.body = body; this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); @@ -30,8 +32,8 @@ class HttpRequest { return method; } - public String path() { - return path; + public String pathAndQuery() { + return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms"; } public Map<String, Supplier<String>> headers() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 63d061d85d3..42079718115 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -36,6 +36,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.INFO; import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; @@ -346,13 +347,15 @@ class HttpRequestStrategy implements RequestStrategy { public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { synchronized (monitor) { AtomicLong usedCounter = inflight; - Cluster usedCluster = delegate; usedCounter.incrementAndGet(); - delegate.dispatch(request, vessel); + Cluster usedCluster = delegate; + usedCluster.dispatch(request, vessel); vessel.whenComplete((__, ___) -> { synchronized (monitor) { - if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) { + log.log(INFO, "Closing old HTTP client"); usedCluster.close(); + } } }); } @@ -372,6 +375,7 @@ class HttpRequestStrategy implements RequestStrategy { void reset() throws IOException { synchronized (monitor) { + log.log(INFO, "Replacing underlying HTTP client to attempt recovery"); delegate = clusterFactory.create(); inflight = new AtomicLong(0); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index 6f438e33238..28e5b5d0a21 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -90,7 +90,7 @@ class JettyCluster implements Cluster { vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'")); return; } - Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) + Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.pathAndQuery())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index cec070c06a6..9afaeed8062 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -43,7 +43,9 @@ class HttpFeedClientTest { @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } - FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), + FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setDryrun(true) + .setNanoClock(() -> 0), () -> new DryrunCluster(), new MockRequestStrategy()); @@ -51,8 +53,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", + request.pathAndQuery()); assertEquals("PUT", request.method()); assertEquals("json", new String(request.body(), UTF_8)); @@ -83,8 +85,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?tracelevel=1", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?tracelevel=1&timeout=900000ms", + request.pathAndQuery()); assertEquals("DELETE", request.method()); assertNull(request.body()); @@ -145,8 +147,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&route=route&timeout=5000ms", + request.pathAndQuery()); assertEquals("json", new String(request.body(), UTF_8)); HttpResponse response = HttpResponse.of(502, @@ -183,8 +185,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", + request.pathAndQuery()); assertEquals("json", new String(request.body(), UTF_8)); HttpResponse response = HttpResponse.of(500, @@ -227,7 +229,7 @@ class HttpFeedClientTest { try { assertNull(request.body()); assertEquals("POST", request.method()); - assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true", request.path()); + assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true&timeout=15000ms", request.pathAndQuery()); vessel.complete(response.get()); } catch (Throwable t) { @@ -238,19 +240,23 @@ class HttpFeedClientTest { // Old server, and speed-test. assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0) + .setSpeedTest(true), () -> cluster, null)) .getMessage()); // Old server. - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), () -> cluster, null); // New server. response.set(okResponse); - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), () -> cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index e067a103d59..a95f859df4d 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -44,7 +44,7 @@ class HttpRequestStrategyTest { @Test void testConcurrency() throws IOException { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); + HttpRequest request = new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); @@ -102,7 +102,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(180), nowNanos::get); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -146,8 +146,8 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. nowNanos.set(4_000_000_000L); @@ -168,7 +168,7 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -184,7 +184,7 @@ class HttpRequestStrategyTest { vessel.completeExceptionally(new IOException("retry me")); }); expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get()); + () -> strategy.enqueue(id1, new HttpRequest("POST", "/", "", null, null, Duration.ofMillis(100), nowNanos::get)).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); assertEquals(15, strategy.stats().requests()); @@ -223,15 +223,15 @@ class HttpRequestStrategyTest { // First operation fails, second remains in flight, and third fails. clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); clusters.get(0).expect((__, vessel) -> { try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } }); - CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); + CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)); CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); // Time advances, and the circuit breaker half-opens. assertEquals(CLOSED, breaker.state()); @@ -241,11 +241,11 @@ class HttpRequestStrategyTest { // It's indeterminate which cluster gets the next request, but the second should get the next one after that. clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); assertFalse(clusters.get(0).closed.get()); assertFalse(clusters.get(1).closed.get()); @@ -276,10 +276,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest failing = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest partial = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest blocking = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); |