diff options
author | jonmv <venstad@gmail.com> | 2024-05-21 09:55:08 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2024-05-21 09:55:08 +0200 |
commit | a2c6c2ecf05aebc770abab748751d5c38958b012 (patch) | |
tree | bd8edfed1f0b09bd7c74a63b2aa931b2e69bdab1 /vespa-feed-client | |
parent | cd3b7e5701c81844816d3f1376329e40657e8e07 (diff) |
Actually remove inflight from resettable cluster (just count)
Diffstat (limited to 'vespa-feed-client')
4 files changed, 45 insertions, 29 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 186e3666889..517fa7e4924 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -58,19 +58,18 @@ class HttpFeedClient implements FeedClient { HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { this(builder, - builder.dryrun ? () -> new DryrunCluster() - : () -> { try { return new JettyCluster(builder); } catch (IOException e) { throw new UncheckedIOException(e); } }); + builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { - this(builder, clusters, new HttpRequestStrategy(builder, clusters)); + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); } - HttpFeedClient(FeedClientBuilderImpl builder, Supplier<Cluster> clusters, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, clusters); + verifyConnection(builder, clusterFactory); } @Override @@ -134,9 +133,9 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { + private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { Instant start = Instant.now(); - try (Cluster cluster = clusters.get()) { + try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, @@ -320,4 +319,11 @@ class HttpFeedClient implements FeedClient { return query.toString(); } + /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ + interface ClusterFactory { + + Cluster create() throws IOException; + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 178d6b6809c..5fe59647038 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,12 +8,14 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -73,9 +75,9 @@ class HttpRequestStrategy implements RequestStrategy { private final ResettableCluster resettableCluster; private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, Supplier<Cluster> clusters) { + HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { this.throttler = new DynamicThrottler(builder); - this.resettableCluster = new ResettableCluster(clusters); + this.resettableCluster = new ResettableCluster(clusterFactory); this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -331,19 +333,29 @@ class HttpRequestStrategy implements RequestStrategy { private static class ResettableCluster implements Cluster { private final Object monitor = new Object(); - private final Deque<CompletableFuture<?>> inflight = new ArrayDeque<>(); - private final Supplier<Cluster> delegates; + private final ClusterFactory clusterFactory; + private AtomicLong inflight = new AtomicLong(0); private Cluster delegate; - ResettableCluster(Supplier<Cluster> delegates) { - this.delegates = delegates; - this.delegate = delegates.get(); + ResettableCluster(ClusterFactory clusterFactory) throws IOException { + this.clusterFactory = clusterFactory; + this.delegate = clusterFactory.create(); } @Override public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - inflight.add(vessel); - delegate.dispatch(request, vessel); + synchronized (monitor) { + AtomicLong usedCounter = inflight; + Cluster usedCluster = delegate; + usedCounter.incrementAndGet(); + delegate.dispatch(request, vessel); + vessel.whenComplete((__, ___) -> { + synchronized (monitor) { + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) + usedCluster.close(); + } + }); + } } @Override @@ -358,13 +370,10 @@ class HttpRequestStrategy implements RequestStrategy { return delegate.stats(); } - void reset() { + void reset() throws IOException { synchronized (monitor) { - Cluster obsolete = delegate; - CompletableFuture.allOf(inflight.toArray(CompletableFuture[]::new)) - .whenComplete((__, ___) -> obsolete.close()); - inflight.clear(); - delegate = delegates.get(); + delegate = clusterFactory.create(); + inflight = new AtomicLong(0); } } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 0f1d7180c4c..cec070c06a6 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result; import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.List; @@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpFeedClientTest { @Test - void testFeeding() throws ExecutionException, InterruptedException { + void testFeeding() throws ExecutionException, InterruptedException, IOException { DocumentId id = DocumentId.of("ns", "type", "0"); AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @@ -211,7 +212,7 @@ class HttpFeedClientTest { } @Test - void testHandshake() { + void testHandshake() throws IOException { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 3751f6d7af2..51c6ee550e5 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @Test - void testConcurrency() { + void testConcurrency() throws IOException { int documents = 1 << 16; HttpRequest request = new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); @@ -84,7 +84,7 @@ class HttpRequestStrategyTest { } @Test() - void testRetries() throws ExecutionException, InterruptedException { + void testRetries() throws ExecutionException, InterruptedException, IOException { int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); @@ -212,7 +212,7 @@ class HttpRequestStrategyTest { } @Test - void testResettingCluster() throws ExecutionException, InterruptedException { + void testResettingCluster() throws ExecutionException, InterruptedException, IOException { List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); @@ -259,7 +259,7 @@ class HttpRequestStrategyTest { } @Test - void testShutdown() { + void testShutdown() throws IOException { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); |