diff options
author | Jon Marius Venstad <jonmv@users.noreply.github.com> | 2024-05-21 17:08:07 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-21 17:08:07 +0200 |
commit | 02850df4460cbfa744cf5d36df22be65c5921045 (patch) | |
tree | f489d0f853146bb67ce28e7f4d90716c66fd75b4 /vespa-feed-client | |
parent | a1493fc4ba682925e3c7337b98b58adf8dda9f83 (diff) |
Revert "Replace Jetty client when tripping circuit breaker"
Diffstat (limited to 'vespa-feed-client')
4 files changed, 21 insertions, 159 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index d12d72f7a70..5eb611160cc 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; -import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; @@ -57,19 +56,18 @@ class HttpFeedClient implements FeedClient { private final boolean speedTest; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, - builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); + this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { - this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); + HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { + this(builder, cluster, new HttpRequestStrategy(builder, cluster)); } - HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { + HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, clusterFactory); + verifyConnection(builder, cluster); } @Override @@ -133,9 +131,9 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { Instant start = Instant.now(); - try (Cluster cluster = clusterFactory.create()) { + try { HttpRequest request = new HttpRequest("POST", getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, @@ -319,11 +317,4 @@ class HttpFeedClient implements FeedClient { return query.toString(); } - /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ - interface ClusterFactory { - - Cluster create() throws IOException; - - } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 5fe59647038..f699651634a 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,14 +8,10 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; -import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -25,7 +21,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,14 +66,10 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); - // TODO jonmv: remove if this has no effect - private final ResettableCluster resettableCluster; - private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { this.throttler = new DynamicThrottler(builder); - this.resettableCluster = new ResettableCluster(clusterFactory); - this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -101,12 +92,6 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); - - if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) - resettableCluster.reset(); - else if (breaker.state() == CLOSED) - reset.set(false); - // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } @@ -185,10 +170,6 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } - if (response.code() >= 500) { // Server errors may indicate something wrong with the server. - breaker.failure(response); - } - return false; } @@ -325,58 +306,4 @@ class HttpRequestStrategy implements RequestStrategy { } } - /** - * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes - * renders a client instance permanently unusable. If this is the case, replacing the client altogether - * should allow the feeder to start working again, when it wouldn't otherwise be able to. - */ - private static class ResettableCluster implements Cluster { - - private final Object monitor = new Object(); - private final ClusterFactory clusterFactory; - private AtomicLong inflight = new AtomicLong(0); - private Cluster delegate; - - ResettableCluster(ClusterFactory clusterFactory) throws IOException { - this.clusterFactory = clusterFactory; - this.delegate = clusterFactory.create(); - } - - @Override - public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - synchronized (monitor) { - AtomicLong usedCounter = inflight; - Cluster usedCluster = delegate; - usedCounter.incrementAndGet(); - delegate.dispatch(request, vessel); - vessel.whenComplete((__, ___) -> { - synchronized (monitor) { - if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) - usedCluster.close(); - } - }); - } - } - - @Override - public void close() { - synchronized (monitor) { - delegate.close(); - } - } - - @Override - public OperationStats stats() { - return delegate.stats(); - } - - void reset() throws IOException { - synchronized (monitor) { - delegate = clusterFactory.create(); - inflight = new AtomicLong(0); - } - } - - } - } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index cec070c06a6..14ade35825f 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -11,7 +11,6 @@ import ai.vespa.feed.client.Result; import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.List; @@ -33,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpFeedClientTest { @Test - void testFeeding() throws ExecutionException, InterruptedException, IOException { + void testFeeding() throws ExecutionException, InterruptedException { DocumentId id = DocumentId.of("ns", "type", "0"); AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @@ -44,7 +43,7 @@ class HttpFeedClientTest { @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), - () -> new DryrunCluster(), + new DryrunCluster(), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. @@ -212,7 +211,7 @@ class HttpFeedClientTest { } @Test - void testHandshake() throws IOException { + void testHandshake() { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); @@ -239,19 +238,19 @@ class HttpFeedClientTest { assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), - () -> cluster, + cluster, null)) .getMessage()); // Old server. new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - () -> cluster, + cluster, null); // New server. response.set(okResponse); new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - () -> cluster, + cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index f313f08426c..b9ab4e481ac 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -17,13 +17,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @Test - void testConcurrency() throws IOException { + void testConcurrency() { int documents = 1 << 16; HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); @@ -52,7 +50,7 @@ class HttpRequestStrategyTest { HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), - () -> cluster); + cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { @@ -84,7 +82,7 @@ class HttpRequestStrategyTest { } @Test() - void testRetries() throws ExecutionException, InterruptedException, IOException { + void testRetries() throws ExecutionException, InterruptedException { int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); @@ -97,7 +95,7 @@ class HttpRequestStrategyTest { .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) .setMaxStreamPerConnection(minStreams), - () -> cluster); + cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); @@ -212,54 +210,7 @@ class HttpRequestStrategyTest { } @Test - void testResettingCluster() throws ExecutionException, InterruptedException, IOException { - List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); - AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), - clusters.iterator()::next); - - // First operation fails, second remains in flight, and third fails. - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); - Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); - clusters.get(0).expect((__, vessel) -> { - try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } - }); - CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)); - CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get(); - - // Time advances, and the circuit breaker half-opens. - assertEquals(CLOSED, breaker.state()); - now.addAndGet(2000); - assertEquals(HALF_OPEN, breaker.state()); - - // It's indeterminate which cluster gets the next request, but the second should get the next one after that. - clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); - assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); - - clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); - clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); - assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code()); - - assertFalse(clusters.get(0).closed.get()); - assertFalse(clusters.get(1).closed.get()); - secondVessel.complete(HttpResponse.of(504, null)); - assertEquals(504, secondResponse.get().code()); - assertTrue(clusters.get(0).closed.get()); - assertFalse(clusters.get(1).closed.get()); - strategy.await(); - strategy.destroy(); - assertTrue(clusters.get(1).closed.get()); - } - - @Test - void testShutdown() throws IOException { + void testShutdown() { MockCluster cluster = new MockCluster(); AtomicLong nowNanos = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); @@ -269,7 +220,7 @@ class HttpRequestStrategyTest { }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. - () -> cluster); + cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); @@ -346,7 +297,6 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); - final AtomicBoolean closed = new AtomicBoolean(false); void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); @@ -357,11 +307,6 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } - @Override - public void close() { - closed.set(true); - } - } } |