aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-16 23:28:38 +0200
committerjonmv <venstad@gmail.com>2024-05-16 23:28:38 +0200
commitcd3b7e5701c81844816d3f1376329e40657e8e07 (patch)
tree901568d46d9fde911b6ca68006eaeb5466709014 /vespa-feed-client
parentd7954c1b84d26b7f6b429159c3d9b956dc39eaef (diff)
Replace Jetty client when tripping circuit breaker
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java17
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java68
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java8
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java64
4 files changed, 138 insertions, 19 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index f876c4efade..186e3666889 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.StreamReadConstraints;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Duration;
@@ -56,18 +57,20 @@ class HttpFeedClient implements FeedClient {
private final boolean speedTest;
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
- this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder));
+ this(builder,
+ builder.dryrun ? () -> new DryrunCluster()
+ : () -> { try { return new JettyCluster(builder); } catch (IOException e) { throw new UncheckedIOException(e); } });
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) {
- this(builder, cluster, new HttpRequestStrategy(builder, cluster));
+ HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
+ this(builder, clusters, new HttpRequestStrategy(builder, clusters));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) {
+ HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters, RequestStrategy requestStrategy) {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
- verifyConnection(builder, cluster);
+ verifyConnection(builder, clusters);
}
@Override
@@ -131,9 +134,9 @@ class HttpFeedClient implements FeedClient {
return promise;
}
- private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) {
+ private void verifyConnection(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
Instant start = Instant.now();
- try {
+ try (Cluster cluster = clusters.get()) {
HttpRequest request = new HttpRequest("POST",
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index f699651634a..178d6b6809c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -10,6 +10,8 @@ import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
@@ -21,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,10 +69,14 @@ class HttpRequestStrategy implements RequestStrategy {
thread.setDaemon(true);
return thread;
});
+ // TODO jonmv: remove if this has no effect
+ private final ResettableCluster resettableCluster;
+ private final AtomicBoolean reset = new AtomicBoolean(false);
- HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
this.throttler = new DynamicThrottler(builder);
- this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster;
+ this.resettableCluster = new ResettableCluster(clusters);
+ this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
@@ -92,6 +99,12 @@ class HttpRequestStrategy implements RequestStrategy {
try {
while (breaker.state() != OPEN && ! destroyed.get()) {
while ( ! isInExcess() && poll() && breaker.state() == CLOSED);
+
+ if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true))
+ resettableCluster.reset();
+ else if (breaker.state() == CLOSED)
+ reset.set(false);
+
// Sleep when circuit is half-open, nap when queue is empty, or we are throttled.
Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1);
}
@@ -170,6 +183,10 @@ class HttpRequestStrategy implements RequestStrategy {
return retry(request, attempt);
}
+ if (response.code() >= 500) { // Server errors may indicate something wrong with the server.
+ breaker.failure(response);
+ }
+
return false;
}
@@ -306,4 +323,51 @@ class HttpRequestStrategy implements RequestStrategy {
}
}
+ /**
+ * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes
+ * renders a client instance permanently unusable. If this is the case, replacing the client altogether
+ * should allow the feeder to start working again, when it wouldn't otherwise be able to.
+ */
+ private static class ResettableCluster implements Cluster {
+
+ private final Object monitor = new Object();
+ private final Deque<CompletableFuture<?>> inflight = new ArrayDeque<>();
+ private final Supplier<Cluster> delegates;
+ private Cluster delegate;
+
+ ResettableCluster(Supplier<Cluster> delegates) {
+ this.delegates = delegates;
+ this.delegate = delegates.get();
+ }
+
+ @Override
+ public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
+ inflight.add(vessel);
+ delegate.dispatch(request, vessel);
+ }
+
+ @Override
+ public void close() {
+ synchronized (monitor) {
+ delegate.close();
+ }
+ }
+
+ @Override
+ public OperationStats stats() {
+ return delegate.stats();
+ }
+
+ void reset() {
+ synchronized (monitor) {
+ Cluster obsolete = delegate;
+ CompletableFuture.allOf(inflight.toArray(CompletableFuture[]::new))
+ .whenComplete((__, ___) -> obsolete.close());
+ inflight.clear();
+ delegate = delegates.get();
+ }
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 14ade35825f..0f1d7180c4c 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -43,7 +43,7 @@ class HttpFeedClientTest {
@Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); }
}
FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true),
- new DryrunCluster(),
+ () -> new DryrunCluster(),
new MockRequestStrategy());
// Update is a PUT, and 200 OK is a success.
@@ -238,19 +238,19 @@ class HttpFeedClientTest {
assertEquals("server does not support speed test; upgrade to a newer version",
assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true),
- cluster,
+ () -> cluster,
null))
.getMessage());
// Old server.
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ () -> cluster,
null);
// New server.
response.set(okResponse);
new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))),
- cluster,
+ () -> cluster,
null);
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b06971ea0b1..3751f6d7af2 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -11,20 +11,19 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
-import java.time.Clock;
import java.time.Duration;
-import java.time.Instant;
-import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -53,7 +52,7 @@ class HttpRequestStrategyTest {
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setConnectionsPerEndpoint(1 << 10)
.setMaxStreamPerConnection(1 << 12),
- cluster);
+ () -> cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
@@ -98,7 +97,7 @@ class HttpRequestStrategyTest {
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(1)
.setMaxStreamPerConnection(minStreams),
- cluster);
+ () -> cluster);
OperationStats initial = strategy.stats();
DocumentId id1 = DocumentId.of("ns", "type", "1");
@@ -213,6 +212,53 @@ class HttpRequestStrategyTest {
}
@Test
+ void testResettingCluster() throws ExecutionException, InterruptedException {
+ List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
+ AtomicLong now = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
+ HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
+ .setCircuitBreaker(breaker)
+ .setConnectionsPerEndpoint(1),
+ clusters.iterator()::next);
+
+ // First operation fails, second remains in flight, and third fails.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+ Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>();
+ clusters.get(0).expect((__, vessel) -> {
+ try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); }
+ });
+ CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get));
+ CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null);
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get();
+
+ // Time advances, and the circuit breaker half-opens.
+ assertEquals(CLOSED, breaker.state());
+ now.addAndGet(2000);
+ assertEquals(HALF_OPEN, breaker.state());
+
+ // It's indeterminate which cluster gets the next request, but the second should get the next one after that.
+ clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null)));
+ assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called")));
+ clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null)));
+ assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get)).get().code());
+
+ assertFalse(clusters.get(0).closed.get());
+ assertFalse(clusters.get(1).closed.get());
+ secondVessel.complete(HttpResponse.of(504, null));
+ assertEquals(504, secondResponse.get().code());
+ assertTrue(clusters.get(0).closed.get());
+ assertFalse(clusters.get(1).closed.get());
+ strategy.await();
+ strategy.destroy();
+ assertTrue(clusters.get(1).closed.get());
+ }
+
+ @Test
void testShutdown() {
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
@@ -223,7 +269,7 @@ class HttpRequestStrategyTest {
})
.setCircuitBreaker(breaker)
.setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
- cluster);
+ () -> cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
@@ -300,6 +346,7 @@ class HttpRequestStrategyTest {
static class MockCluster implements Cluster {
final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
+ final AtomicBoolean closed = new AtomicBoolean(false);
void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) {
dispatch.set(expected);
@@ -310,6 +357,11 @@ class HttpRequestStrategyTest {
dispatch.get().accept(request, vessel);
}
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+
}
}