diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2024-05-21 15:28:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-21 15:28:20 +0200 |
commit | e92a0507aca9ab2dcde8d412be5d680774847c4d (patch) | |
tree | 1c673c0cbd3b79b5e0730fe0ad6a731110b67ddd | |
parent | 334cb3b9ebf0f6811bb42d82bf64437873a950df (diff) | |
parent | a2c6c2ecf05aebc770abab748751d5c38958b012 (diff) |
Merge pull request #31233 from vespa-engine/jonmv/reset-jetty-client-on-circuit-breaker-trip
Replace Jetty client when tripping circuit breaker
4 files changed, 159 insertions, 21 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 5eb611160cc..d12d72f7a70 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; +import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; @@ -56,18 +57,19 @@ class HttpFeedClient implements FeedClient { private final boolean speedTest; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); + this(builder, + builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { - this(builder, cluster, new HttpRequestStrategy(builder, cluster)); + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, cluster); + verifyConnection(builder, clusterFactory); } @Override @@ -131,9 +133,9 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { + private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { Instant start = Instant.now(); - try { + try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, @@ -317,4 +319,11 @@ class HttpFeedClient implements FeedClient { return query.toString(); } + /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ + interface ClusterFactory { + + Cluster create() throws IOException; + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index f699651634a..5fe59647038 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,10 +8,14 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -21,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -66,10 +71,14 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); + // TODO jonmv: remove if this has no effect + private final ResettableCluster resettableCluster; + private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { this.throttler = new DynamicThrottler(builder); - this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; + this.resettableCluster = new ResettableCluster(clusterFactory); + this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -92,6 +101,12 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); + + if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) + resettableCluster.reset(); + else if (breaker.state() == CLOSED) + reset.set(false); + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } @@ -170,6 +185,10 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } + if (response.code() >= 500) { // Server errors may indicate something wrong with the server. + breaker.failure(response); + } + return false; } @@ -306,4 +325,58 @@ class HttpRequestStrategy implements RequestStrategy { } } + /** + * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes + * renders a client instance permanently unusable. If this is the case, replacing the client altogether + * should allow the feeder to start working again, when it wouldn't otherwise be able to. + */ + private static class ResettableCluster implements Cluster { + + private final Object monitor = new Object(); + private final ClusterFactory clusterFactory; + private AtomicLong inflight = new AtomicLong(0); + private Cluster delegate; + + ResettableCluster(ClusterFactory clusterFactory) throws IOException { + this.clusterFactory = clusterFactory; + this.delegate = clusterFactory.create(); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + synchronized (monitor) { + AtomicLong usedCounter = inflight; + Cluster usedCluster = delegate; + usedCounter.incrementAndGet(); + delegate.dispatch(request, vessel); + vessel.whenComplete((__, ___) -> { + synchronized (monitor) { + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) + usedCluster.close(); + } + }); + } + } + + @Override + public void close() { + synchronized (monitor) { + delegate.close(); + } + } + + @Override + public OperationStats stats() { + return delegate.stats(); + } + + void reset() throws IOException { + synchronized (monitor) { + delegate = clusterFactory.create(); + inflight = new AtomicLong(0); + } + } + + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 14ade35825f..cec070c06a6 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result; import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.List; @@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpFeedClientTest { @Test - void testFeeding() throws ExecutionException, InterruptedException { + void testFeeding() throws ExecutionException, InterruptedException, IOException { DocumentId id = DocumentId.of("ns", "type", "0"); AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @@ -43,7 +44,7 @@ class HttpFeedClientTest { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), - new DryrunCluster(), + () -> new DryrunCluster(), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. @@ -211,7 +212,7 @@ class HttpFeedClientTest { } @Test - void testHandshake() { + void testHandshake() throws IOException { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); @@ -238,19 +239,19 @@ class HttpFeedClientTest { assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), - cluster, + () -> cluster, null)) .getMessage()); // Old server. new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + () -> cluster, null); // New server. response.set(okResponse); new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + () -> cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b9ab4e481ac..f313f08426c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -17,11 +17,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -40,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @Test - void testConcurrency() { + void testConcurrency() throws IOException { int documents = 1 << 16; HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); @@ -50,7 +52,7 @@ class HttpRequestStrategyTest { HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), - cluster); + () -> cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { @@ -82,7 +84,7 @@ class HttpRequestStrategyTest { } @Test() - void testRetries() throws ExecutionException, InterruptedException { + void testRetries() throws ExecutionException, InterruptedException, IOException { int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); @@ -95,7 +97,7 @@ class HttpRequestStrategyTest { .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) .setMaxStreamPerConnection(minStreams), - cluster); + () -> cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); @@ -210,7 +212,54 @@ class HttpRequestStrategyTest { } @Test - void testShutdown() { + void testResettingCluster() throws ExecutionException, InterruptedException, IOException { + List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); + AtomicLong now = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + clusters.iterator()::next); + + // First operation fails, second remains in flight, and third fails. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); + clusters.get(0).expect((__, vessel) -> { + try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } + }); + CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); + CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); + + // Time advances, and the circuit breaker half-opens. + assertEquals(CLOSED, breaker.state()); + now.addAndGet(2000); + assertEquals(HALF_OPEN, breaker.state()); + + // It's indeterminate which cluster gets the next request, but the second should get the next one after that. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + assertFalse(clusters.get(0).closed.get()); + assertFalse(clusters.get(1).closed.get()); + secondVessel.complete(HttpResponse.of(504, null)); + assertEquals(504, secondResponse.get().code()); + assertTrue(clusters.get(0).closed.get()); + assertFalse(clusters.get(1).closed.get()); + strategy.await(); + strategy.destroy(); + assertTrue(clusters.get(1).closed.get()); + } + + @Test + void testShutdown() throws IOException { MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); @@ -220,7 +269,7 @@ class HttpRequestStrategyTest { }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. - cluster); + () -> cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); @@ -297,6 +346,7 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); + final AtomicBoolean closed = new AtomicBoolean(false); void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); @@ -307,6 +357,11 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } + @Override + public void close() { + closed.set(true); + } + } } |