summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2024-05-21 17:08:57 +0200
committerGitHub <noreply@github.com>2024-05-21 17:08:57 +0200
commit3d65e159d42ff317ca52107960c19af1a431afd1 (patch)
treef489d0f853146bb67ce28e7f4d90716c66fd75b4
parenta1493fc4ba682925e3c7337b98b58adf8dda9f83 (diff)
parent02850df4460cbfa744cf5d36df22be65c5921045 (diff)
Merge pull request #31265 from vespa-engine/revert-31233-jonmv/reset-jetty-client-on-circuit-breaker-trip
Revert "Replace Jetty client when tripping circuit breaker" MERGEOK
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java23
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java77
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java13
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java67
4 files changed, 21 insertions, 159 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index d12d72f7a70..5eb611160cc 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
@@ -57,19 +56,18 @@ class HttpFeedClient implements FeedClient {
private final boolean speedTest;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
- this(builder,
- builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder));
+ this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder));
}
- HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
- this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory));
+ HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) {
+ this(builder, cluster, new HttpRequestStrategy(builder, cluster));
}
- HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException {
+ HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
- verifyConnection(builder, clusterFactory);
+ verifyConnection(builder, cluster);
}
@Override
@@ -133,9 +131,9 @@ class HttpFeedClient implements FeedClient {
return promise;
}
- private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
+ private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) {
Instant start = Instant.now();
- try (Cluster cluster = clusterFactory.create()) {
+ try {
HttpRequest request = new HttpRequest("POST",
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
@@ -319,11 +317,4 @@ class HttpFeedClient implements FeedClient {
return query.toString();
}
- /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
- interface ClusterFactory {
-
- Cluster create() throws IOException;
-
- }
-
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index 5fe59647038..f699651634a 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -8,14 +8,10 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
-import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -25,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -71,14 +66,10 @@ class HttpRequestStrategy implements RequestStrategy {
thread.setDaemon(true);
return thread;
});
- // TODO jonmv: remove if this has no effect
- private final ResettableCluster resettableCluster;
- private final AtomicBoolean reset = new AtomicBoolean(false);
- HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) {
this.throttler = new DynamicThrottler(builder);
- this.resettableCluster = new ResettableCluster(clusterFactory);
- this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster;
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
@@ -101,12 +92,6 @@ class HttpRequestStrategy implements RequestStrategy {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
-
- if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true))
- resettableCluster.reset();
- else if (breaker.state() == CLOSED)
- reset.set(false);
-
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1);
}
@@ -185,10 +170,6 @@ class HttpRequestStrategy implements RequestStrategy {
return retry(request, attempt);
}
- if (response.code() >= 500) { // Server errors may indicate something wrong with the server.
- breaker.failure(response);
- }
-
return false;
}
@@ -325,58 +306,4 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
- /**
- * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes
- * renders a client instance permanently unusable. If this is the case, replacing the client altogether
- * should allow the feeder to start working again, when it wouldn't otherwise be able to.
- */
- private static class ResettableCluster implements Cluster {
-
- private final Object monitor = new Object();
- private final ClusterFactory clusterFactory;
- private AtomicLong inflight = new AtomicLong(0);
- private Cluster delegate;
-
- ResettableCluster(ClusterFactory clusterFactory) throws IOException {
- this.clusterFactory = clusterFactory;
- this.delegate = clusterFactory.create();
- }
-
- @Override
- public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
- synchronized (monitor) {
- AtomicLong usedCounter = inflight;
- Cluster usedCluster = delegate;
- usedCounter.incrementAndGet();
- delegate.dispatch(request, vessel);
- vessel.whenComplete((__, ___) -> {
- synchronized (monitor) {
- if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate)
- usedCluster.close();
- }
- });
- }
- }
-
- @Override
- public void close() {
- synchronized (monitor) {
- delegate.close();
- }
- }
-
- @Override
- public OperationStats stats() {
- return delegate.stats();
- }
-
- void reset() throws IOException {
- synchronized (monitor) {
- delegate = clusterFactory.create();
- inflight = new AtomicLong(0);
- }
- }
-
- }
-
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index cec070c06a6..14ade35825f 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -11,7 +11,6 @@ import ai.vespa.feed.client.Result;
import ai.vespa.feed.client.ResultException;
import org.junit.jupiter.api.Test;
-import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
@@ -33,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpFeedClientTest {
@Test
- void testFeeding() throws ExecutionException, InterruptedException, IOException {
+ void testFeeding() throws ExecutionException, InterruptedException {
DocumentId id = DocumentId.of("ns", "type", "0");
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@@ -44,7 +43,7 @@ class HttpFeedClientTest {
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
- () -> new DryrunCluster(),
+ new DryrunCluster(),
new MockRequestStrategy());
// Update is a PUT, and 200 OK is a success.
@@ -212,7 +211,7 @@ class HttpFeedClientTest {
}
@Test
- void testHandshake() throws IOException {
+ void testHandshake() {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
@@ -239,19 +238,19 @@ class HttpFeedClientTest {
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
- () -> cluster,
+ cluster,
null))
.getMessage());
// Old server.
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- () -> cluster,
+ cluster,
null);
// New server.
response.set(okResponse);
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- () -> cluster,
+ cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index f313f08426c..b9ab4e481ac 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -17,13 +17,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,7 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() throws IOException {
+ void testConcurrency() {
int documents = 1 << 16;
HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
@@ -52,7 +50,7 @@ class HttpRequestStrategyTest {
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setConnectionsPerEndpoint(1 << 10)
.setMaxStreamPerConnection(1 << 12),
- () -> cluster);
+ cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -84,7 +82,7 @@ class HttpRequestStrategyTest {
}
@Test()
- void testRetries() throws ExecutionException, InterruptedException, IOException {
+ void testRetries() throws ExecutionException, InterruptedException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
@@ -97,7 +95,7 @@ class HttpRequestStrategyTest {
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
.setMaxStreamPerConnection(minStreams),
- () -> cluster);
+ cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
@@ -212,54 +210,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
- List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
- HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
- .setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
- clusters.iterator()::next);
-
- // First operation fails, second remains in flight, and third fails.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
- Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
- clusters.get(0).expect((__, vessel) -> {
- try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
- });
- CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
- CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
-
- // Time advances, and the circuit breaker half-opens.
- assertEquals(CLOSED, breaker.state());
- now.addAndGet(2000);
- assertEquals(HALF_OPEN, breaker.state());
-
- // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
- clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
- assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
- clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
- assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
-
- assertFalse(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- secondVessel.complete(HttpResponse.of(504, null));
- assertEquals(504, secondResponse.get().code());
- assertTrue(clusters.get(0).closed.get());
- assertFalse(clusters.get(1).closed.get());
- strategy.await();
- strategy.destroy();
- assertTrue(clusters.get(1).closed.get());
- }
-
- @Test
- void testShutdown() throws IOException {
+ void testShutdown() {
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
@@ -269,7 +220,7 @@ class HttpRequestStrategyTest {
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
- () -> cluster);
+ cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
@@ -346,7 +297,6 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
- final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -357,11 +307,6 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
- @Override
- public void close() {
- closed.set(true);
- }
-
}
}