diff options
Diffstat (limited to 'vespa-feed-client/src/main')
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java | 23 | ||||
-rw-r--r-- | vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java | 77 |
2 files changed, 9 insertions, 91 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index d12d72f7a70..5eb611160cc 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; -import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; @@ -57,19 +56,18 @@ class HttpFeedClient implements FeedClient { private final boolean speedTest; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, - builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); + this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { - this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); + HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { + this(builder, cluster, new HttpRequestStrategy(builder, cluster)); } - HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { + HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, clusterFactory); + verifyConnection(builder, cluster); } @Override @@ -133,9 +131,9 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { Instant start = Instant.now(); - try (Cluster cluster = clusterFactory.create()) { + try { HttpRequest request = new HttpRequest("POST", getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), requestHeaders, @@ -319,11 +317,4 @@ class HttpFeedClient implements FeedClient { return query.toString(); } - /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ - interface ClusterFactory { - - Cluster create() throws IOException; - - } - } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 5fe59647038..f699651634a 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,14 +8,10 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; -import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; import java.util.Map; import java.util.Queue; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -25,7 +21,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,14 +66,10 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); - // TODO jonmv: remove if this has no effect - private final ResettableCluster resettableCluster; - private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { this.throttler = new DynamicThrottler(builder); - this.resettableCluster = new ResettableCluster(clusterFactory); - this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -101,12 +92,6 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); - - if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) - resettableCluster.reset(); - else if (breaker.state() == CLOSED) - reset.set(false); - // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } @@ -185,10 +170,6 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } - if (response.code() >= 500) { // Server errors may indicate something wrong with the server. - breaker.failure(response); - } - return false; } @@ -325,58 +306,4 @@ class HttpRequestStrategy implements RequestStrategy { } } - /** - * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes - * renders a client instance permanently unusable. If this is the case, replacing the client altogether - * should allow the feeder to start working again, when it wouldn't otherwise be able to. - */ - private static class ResettableCluster implements Cluster { - - private final Object monitor = new Object(); - private final ClusterFactory clusterFactory; - private AtomicLong inflight = new AtomicLong(0); - private Cluster delegate; - - ResettableCluster(ClusterFactory clusterFactory) throws IOException { - this.clusterFactory = clusterFactory; - this.delegate = clusterFactory.create(); - } - - @Override - public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { - synchronized (monitor) { - AtomicLong usedCounter = inflight; - Cluster usedCluster = delegate; - usedCounter.incrementAndGet(); - delegate.dispatch(request, vessel); - vessel.whenComplete((__, ___) -> { - synchronized (monitor) { - if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) - usedCluster.close(); - } - }); - } - } - - @Override - public void close() { - synchronized (monitor) { - delegate.close(); - } - } - - @Override - public OperationStats stats() { - return delegate.stats(); - } - - void reset() throws IOException { - synchronized (monitor) { - delegate = clusterFactory.create(); - inflight = new AtomicLong(0); - } - } - - } - } |