aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-21 09:55:08 +0200
committerjonmv <venstad@gmail.com>2024-05-21 09:55:08 +0200
commita2c6c2ecf05aebc770abab748751d5c38958b012 (patch)
treebd8edfed1f0b09bd7c74a63b2aa931b2e69bdab1 /vespa-feed-client
parentcd3b7e5701c81844816d3f1376329e40657e8e07 (diff)
Actually remove inflight from resettable cluster (just count)
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java22
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java39
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java5
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java8
4 files changed, 45 insertions, 29 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 186e3666889..517fa7e4924 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -58,19 +58,18 @@ class HttpFeedClient implements FeedClient {
HttpFeedClient(FeedClientBuilderImpl builder) throws IOException {
this(builder,
- builder.dryrun ? () -> new DryrunCluster()
- : () -> { try { return new JettyCluster(builder); } catch (IOException e) { throw new UncheckedIOException(e); } });
+ builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
- this(builder, clusters, new HttpRequestStrategy(builder, clusters));
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
+ this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory));
}
- HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters, RequestStrategy requestStrategy) {
+ HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException {
this.requestHeaders = new HashMap<>(builder.requestHeaders);
this.requestStrategy = requestStrategy;
this.speedTest = builder.speedTest;
- verifyConnection(builder, clusters);
+ verifyConnection(builder, clusterFactory);
}
@Override
@@ -134,9 +133,9 @@ class HttpFeedClient implements FeedClient {
return promise;
}
- private void verifyConnection(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
+ private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
Instant start = Instant.now();
- try (Cluster cluster = clusters.get()) {
+ try (Cluster cluster = clusterFactory.create()) {
HttpRequest request = new HttpRequest("POST",
getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true),
requestHeaders,
@@ -320,4 +319,11 @@ class HttpFeedClient implements FeedClient {
return query.toString();
}
+ /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */
+ interface ClusterFactory {
+
+ Cluster create() throws IOException;
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
index 178d6b6809c..5fe59647038 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java
@@ -8,12 +8,14 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy;
import ai.vespa.feed.client.FeedException;
import ai.vespa.feed.client.HttpResponse;
import ai.vespa.feed.client.OperationStats;
+import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -73,9 +75,9 @@ class HttpRequestStrategy implements RequestStrategy {
private final ResettableCluster resettableCluster;
private final AtomicBoolean reset = new AtomicBoolean(false);
- HttpRequestStrategy(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) {
+ HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException {
this.throttler = new DynamicThrottler(builder);
- this.resettableCluster = new ResettableCluster(clusters);
+ this.resettableCluster = new ResettableCluster(clusterFactory);
this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
@@ -331,19 +333,29 @@ class HttpRequestStrategy implements RequestStrategy {
private static class ResettableCluster implements Cluster {
private final Object monitor = new Object();
- private final Deque<CompletableFuture<?>> inflight = new ArrayDeque<>();
- private final Supplier<Cluster> delegates;
+ private final ClusterFactory clusterFactory;
+ private AtomicLong inflight = new AtomicLong(0);
private Cluster delegate;
- ResettableCluster(Supplier<Cluster> delegates) {
- this.delegates = delegates;
- this.delegate = delegates.get();
+ ResettableCluster(ClusterFactory clusterFactory) throws IOException {
+ this.clusterFactory = clusterFactory;
+ this.delegate = clusterFactory.create();
}
@Override
public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
- inflight.add(vessel);
- delegate.dispatch(request, vessel);
+ synchronized (monitor) {
+ AtomicLong usedCounter = inflight;
+ Cluster usedCluster = delegate;
+ usedCounter.incrementAndGet();
+ delegate.dispatch(request, vessel);
+ vessel.whenComplete((__, ___) -> {
+ synchronized (monitor) {
+ if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate)
+ usedCluster.close();
+ }
+ });
+ }
}
@Override
@@ -358,13 +370,10 @@ class HttpRequestStrategy implements RequestStrategy {
return delegate.stats();
}
- void reset() {
+ void reset() throws IOException {
synchronized (monitor) {
- Cluster obsolete = delegate;
- CompletableFuture.allOf(inflight.toArray(CompletableFuture[]::new))
- .whenComplete((__, ___) -> obsolete.close());
- inflight.clear();
- delegate = delegates.get();
+ delegate = clusterFactory.create();
+ inflight = new AtomicLong(0);
}
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
index 0f1d7180c4c..cec070c06a6 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java
@@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result;
import ai.vespa.feed.client.ResultException;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
@@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpFeedClientTest {
@Test
- void testFeeding() throws ExecutionException, InterruptedException {
+ void testFeeding() throws ExecutionException, InterruptedException, IOException {
DocumentId id = DocumentId.of("ns", "type", "0");
AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>();
class MockRequestStrategy implements RequestStrategy {
@@ -211,7 +212,7 @@ class HttpFeedClientTest {
}
@Test
- void testHandshake() {
+ void testHandshake() throws IOException {
// dummy:123 does not exist, and results in a host-not-found exception.
FeedException exception = assertThrows(FeedException.class,
() -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123")))));
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 3751f6d7af2..51c6ee550e5 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class HttpRequestStrategyTest {
@Test
- void testConcurrency() {
+ void testConcurrency() throws IOException {
int documents = 1 << 16;
HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0);
HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8));
@@ -84,7 +84,7 @@ class HttpRequestStrategyTest {
}
@Test()
- void testRetries() throws ExecutionException, InterruptedException {
+ void testRetries() throws ExecutionException, InterruptedException, IOException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
@@ -212,7 +212,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testResettingCluster() throws ExecutionException, InterruptedException {
+ void testResettingCluster() throws ExecutionException, InterruptedException, IOException {
List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster());
AtomicLong now = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null);
@@ -259,7 +259,7 @@ class HttpRequestStrategyTest {
}
@Test
- void testShutdown() {
+ void testShutdown() throws IOException {
MockCluster cluster = new MockCluster();
AtomicLong now = new AtomicLong(0);
CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));