summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <jonmv@users.noreply.github.com>2024-05-21 15:28:20 +0200
committerGitHub <noreply@github.com>2024-05-21 15:28:20 +0200
commite92a0507aca9ab2dcde8d412be5d680774847c4d (patch)
tree1c673c0cbd3b79b5e0730fe0ad6a731110b67ddd /vespa-feed-client
parent334cb3b9ebf0f6811bb42d82bf64437873a950df (diff)
parenta2c6c2ecf05aebc770abab748751d5c38958b012 (diff)
Merge pull request #31233 from vespa-engine/jonmv/reset-jetty-client-on-circuit-breaker-trip
Replace Jetty client when tripping circuit breaker
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java23
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java77
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java13
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java67
4 files changed, 159 insertions, 21 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 5eb611160cc..d12d72f7a70 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
@@ -56,18 +57,19 @@ class HttpFeedClient implements FeedClient {
private final boolean speedTest;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
- this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder));
+ this(builder,
+ builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) {
- this(builder, cluster, new HttpRequestStrategy(builder, cluster));
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
+ this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) {
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
- verifyConnection(builder, cluster);
+ verifyConnection(builder, clusterFactory);
}
@Override
@@ -131,9 +133,9 @@ class HttpFeedClient implements FeedClient {
return promise;
}
- private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) {
+ private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
Instant start = Instant.now();
- try {
+ try (Cluster cluster = clusterFactory.create()) {
HttpRequest request = new HttpRequest("POST",
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
@@ -317,4 +319,11 @@ class HttpFeedClient implements FeedClient {
return query.toString();
}
+ /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
+ interface ClusterFactory {
+
+ Cluster create() throws IOException;
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index f699651634a..5fe59647038 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -8,10 +8,14 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -21,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,10 +71,14 @@ class HttpRequestStrategy implements RequestStrategy {
thread.setDaemon(true);
return thread;
});
+ // TODO jonmv: remove if this has no effect
+ private final ResettableCluster resettableCluster;
+ private final AtomicBoolean reset = new AtomicBoolean(false);
- HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
this.throttler = new DynamicThrottler(builder);
- this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
+ this.resettableCluster = new ResettableCluster(clusterFactory);
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
@@ -92,6 +101,12 @@ class HttpRequestStrategy implements RequestStrategy {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
+
+ if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true))
+ resettableCluster.reset();
+ else if (breaker.state() == CLOSED)
+ reset.set(false);
+
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1);
}
@@ -170,6 +185,10 @@ class HttpRequestStrategy implements RequestStrategy {
return retry(request, attempt);
}
+ if (response.code() >= 500) { // Server errors may indicate something wrong with the server.
+ breaker.failure(response);
+ }
+
return false;
}
@@ -306,4 +325,58 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
+ /**
+ * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes
+ * renders a client instance permanently unusable. If this is the case, replacing the client altogether
+ * should allow the feeder to start working again, when it wouldn't otherwise be able to.
+ */
+ private static class ResettableCluster implements Cluster {
+
+ private final Object monitor = new Object();
+ private final ClusterFactory clusterFactory;
+ private AtomicLong inflight = new AtomicLong(0);
+ private Cluster delegate;
+
+ ResettableCluster(ClusterFactory clusterFactory) throws IOException {
+ this.clusterFactory = clusterFactory;
+ this.delegate = clusterFactory.create();
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ synchronized (monitor) {
+ AtomicLong usedCounter = inflight;
+ Cluster usedCluster = delegate;
+ usedCounter.incrementAndGet();
+ delegate.dispatch(request, vessel);
+ vessel.whenComplete((__, ___) -> {
+ synchronized (monitor) {
+ if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate)
+ usedCluster.close();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (monitor) {
+ delegate.close();
+ }
+ }
+
+ @Override
+ public OperationStats stats() {
+ return delegate.stats();
+ }
+
+ void reset() throws IOException {
+ synchronized (monitor) {
+ delegate = clusterFactory.create();
+ inflight = new AtomicLong(0);
+ }
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 14ade35825f..cec070c06a6 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result;
import ai.vespa.feed.client.ResultException;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
@@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpFeedClientTest {
@Test
- void testFeeding() throws ExecutionException, InterruptedException {
+ void testFeeding() throws ExecutionException, InterruptedException, IOException {
DocumentId id = DocumentId.of("ns", "type", "0");
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@@ -43,7 +44,7 @@ class HttpFeedClientTest {
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
- new DryrunCluster(),
+ () -> new DryrunCluster(),
new MockRequestStrategy());
// Update is a PUT, and 200 OK is a success.
@@ -211,7 +212,7 @@ class HttpFeedClientTest {
}
@Test
- void testHandshake() {
+ void testHandshake() throws IOException {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
@@ -238,19 +239,19 @@ class HttpFeedClientTest {
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
- cluster,
+ () -> cluster,
null))
.getMessage());
// Old server.
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ () -> cluster,
null);
// New server.
response.set(okResponse);
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ () -> cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b9ab4e481ac..f313f08426c 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -17,11 +17,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() {
+ void testConcurrency() throws IOException {
int documents = 1 << 16;
HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
@@ -50,7 +52,7 @@ class HttpRequestStrategyTest {
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setConnectionsPerEndpoint(1 << 10)
.setMaxStreamPerConnection(1 << 12),
- cluster);
+ () -> cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -82,7 +84,7 @@ class HttpRequestStrategyTest {
}
@Test()
- void testRetries() throws ExecutionException, InterruptedException {
+ void testRetries() throws ExecutionException, InterruptedException, IOException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
@@ -95,7 +97,7 @@ class HttpRequestStrategyTest {
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
.setMaxStreamPerConnection(minStreams),
- cluster);
+ () -> cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
@@ -210,7 +212,54 @@ class HttpRequestStrategyTest {
}
@Test
- void testShutdown() {
+ void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
+ List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ clusters.iterator()::next);
+
+ // First operation fails, second remains in flight, and third fails.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+ Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
+ clusters.get(0).expect((__, vessel) -> {
+ try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
+ });
+ CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
+ CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+
+ // Time advances, and the circuit breaker half-opens.
+ assertEquals(CLOSED, breaker.state());
+ now.addAndGet(2000);
+ assertEquals(HALF_OPEN, breaker.state());
+
+ // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ assertFalse(clusters.get(0).closed.get());
+ assertFalse(clusters.get(1).closed.get());
+ secondVessel.complete(HttpResponse.of(504, null));
+ assertEquals(504, secondResponse.get().code());
+ assertTrue(clusters.get(0).closed.get());
+ assertFalse(clusters.get(1).closed.get());
+ strategy.await();
+ strategy.destroy();
+ assertTrue(clusters.get(1).closed.get());
+ }
+
+ @Test
+ void testShutdown() throws IOException {
MockCluster cluster = new MockCluster();
AtomicLong nowNanos = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
@@ -220,7 +269,7 @@ class HttpRequestStrategyTest {
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
- cluster);
+ () -> cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
@@ -297,6 +346,7 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+ final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -307,6 +357,11 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+
}
}