diff options
author | jonmv <venstad@gmail.com> | 2024-05-16 23:28:38 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2024-05-16 23:28:38 +0200 |
commit | cd3b7e5701c81844816d3f1376329e40657e8e07 (patch) | |
tree | 901568d46d9fde911b6ca68006eaeb5466709014 /vespa-feed-client | |
parent | d7954c1b84d26b7f6b429159c3d9b956dc39eaef (diff) |
Replace Jetty client when tripping circuit breaker
Diffstat (limited to 'vespa-feed-client')
4 files changed, 138 insertions, 19 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index f876c4efade..186e3666889 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; +import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; @@ -56,18 +57,20 @@ class HttpFeedClient implements FeedClient { private final boolean speedTest; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); + this(builder, + builder.dryrun ? () -> new DryrunCluster() + : () -> { try { return new JettyCluster(builder); } catch (IOException e) { throw new UncheckedIOException(e); } }); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { - this(builder, cluster, new HttpRequestStrategy(builder, cluster)); + HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { + this(builder, clusters, new HttpRequestStrategy(builder, clusters)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters, RequestStrategy requestStrategy) { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, cluster); + verifyConnection(builder, clusters); } @Override @@ -131,9 +134,9 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { + private void verifyConnection(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { Instant start = Instant.now(); - try { + try (Cluster cluster = clusters.get()) { HttpRequest request = new HttpRequest("POST", getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index f699651634a..178d6b6809c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -10,6 +10,8 @@ import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Map; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -66,10 +69,14 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); + // TODO jonmv: remove if this has no effect + private final ResettableCluster resettableCluster; + private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { this.throttler = new DynamicThrottler(builder); - this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; + this.resettableCluster = new ResettableCluster(clusters); + this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -92,6 +99,12 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); + + if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) + resettableCluster.reset(); + else if (breaker.state() == CLOSED) + reset.set(false); + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } @@ -170,6 +183,10 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } + if (response.code() >= 500) { // Server errors may indicate something wrong with the server. + breaker.failure(response); + } + return false; } @@ -306,4 +323,51 @@ class HttpRequestStrategy implements RequestStrategy { } } + /** + * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes + * renders a client instance permanently unusable. If this is the case, replacing the client altogether + * should allow the feeder to start working again, when it wouldn't otherwise be able to. + */ + private static class ResettableCluster implements Cluster { + + private final Object monitor = new Object(); + private final Deque<CompletableFuture<?>> inflight = new ArrayDeque<>(); + private final Supplier<Cluster> delegates; + private Cluster delegate; + + ResettableCluster(Supplier<Cluster> delegates) { + this.delegates = delegates; + this.delegate = delegates.get(); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + inflight.add(vessel); + delegate.dispatch(request, vessel); + } + + @Override + public void close() { + synchronized (monitor) { + delegate.close(); + } + } + + @Override + public OperationStats stats() { + return delegate.stats(); + } + + void reset() { + synchronized (monitor) { + Cluster obsolete = delegate; + CompletableFuture.allOf(inflight.toArray(CompletableFuture[]::new)) + .whenComplete((__, ___) -> obsolete.close()); + inflight.clear(); + delegate = delegates.get(); + } + } + + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 14ade35825f..0f1d7180c4c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -43,7 +43,7 @@ class HttpFeedClientTest { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), - new DryrunCluster(), + () -> new DryrunCluster(), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. @@ -238,19 +238,19 @@ class HttpFeedClientTest { assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), - cluster, + () -> cluster, null)) .getMessage()); // Old server. new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + () -> cluster, null); // New server. response.set(okResponse); new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + () -> cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b06971ea0b1..3751f6d7af2 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -11,20 +11,19 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; -import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,7 +52,7 @@ class HttpRequestStrategyTest { HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), - cluster); + () -> cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { @@ -98,7 +97,7 @@ class HttpRequestStrategyTest { .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) .setMaxStreamPerConnection(minStreams), - cluster); + () -> cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); @@ -213,6 +212,53 @@ class HttpRequestStrategyTest { } @Test + void testResettingCluster() throws ExecutionException, InterruptedException { + List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + clusters.iterator()::next); + + // First operation fails, second remains in flight, and third fails. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); + clusters.get(0).expect((__, vessel) -> { + try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } + }); + CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); + CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + + // Time advances, and the circuit breaker half-opens. + assertEquals(CLOSED, breaker.state()); + now.addAndGet(2000); + assertEquals(HALF_OPEN, breaker.state()); + + // It's indeterminate which cluster gets the next request, but the second should get the next one after that. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + assertFalse(clusters.get(0).closed.get()); + assertFalse(clusters.get(1).closed.get()); + secondVessel.complete(HttpResponse.of(504, null)); + assertEquals(504, secondResponse.get().code()); + assertTrue(clusters.get(0).closed.get()); + assertFalse(clusters.get(1).closed.get()); + strategy.await(); + strategy.destroy(); + assertTrue(clusters.get(1).closed.get()); + } + + @Test void testShutdown() { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); @@ -223,7 +269,7 @@ class HttpRequestStrategyTest { }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. - cluster); + () -> cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); @@ -300,6 +346,7 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); + final AtomicBoolean closed = new AtomicBoolean(false); void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); @@ -310,6 +357,11 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } + @Override + public void close() { + closed.set(true); + } + } } |