diff options
Diffstat (limited to 'vespa-feed-client/src/main/java')
9 files changed, 215 insertions, 66 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java index 99c891696f5..bd1309d3bca 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java @@ -48,9 +48,7 @@ public class BenchmarkingCluster implements Cluster { public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { requests.incrementAndGet(); long startNanos = System.nanoTime(); - if (timeOfFirstDispatch.get() == 0) { - timeOfFirstDispatch.set(startNanos); - } + timeOfFirstDispatch.compareAndSet(0, startNanos); delegate.dispatch(request, vessel); vessel.whenCompleteAsync((response, thrown) -> { results++; @@ -94,8 +92,8 @@ public class BenchmarkingCluster implements Cluster { if (responsesByCode[code] > 0) responses.put(code, responsesByCode[code]); - double duration = (System.nanoTime() - timeOfFirstDispatch.get())*1e-9; - return new OperationStats(duration, requests, responses, exceptions, + double duration = (System.nanoTime() - timeOfFirstDispatch.get()) * 1e-9; + return new OperationStats(duration, requests, responses, exceptions, requests - results, throttler.targetInflight(), this.responses == 0 ? -1 : totalLatencyMillis / this.responses, this.responses == 0 ? -1 : minLatencyMillis, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 951a1776b6f..3344a372734 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,13 +28,13 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(8 * minInflight); + targetInflight = new AtomicLong(minInflight); } @Override public void sent(long __, CompletableFuture<HttpResponse> ___) { - double currentInflight = targetInflight.get(); - if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) + double currentInflight = targetInflight(); + if (++sent * sent * sent < 1e3 * currentInflight * currentInflight) return; sent = 0; @@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler { // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight))) - / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + / log(256)); // 512 (server max streams per connection) / 2 (our min per connection) throughputs[index] = currentThroughput; // Loop over throughput measurements and pick the one which optimises throughput and latency. - double choice = currentInflight; + double best = currentInflight; double max = -1; - for (int i = throughputs.length; i-- > 0; ) { + int j = -1, k = -1, choice = 0; + double s = 0; + for (int i = 0; i < throughputs.length; i++) { if (throughputs[i] == 0) continue; // Skip unknown values. double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length); double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight). if (objective > max) { max = objective; - choice = inflight; + best = inflight; + choice = i; } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness. + if (j != -1) { + double t = throughputs[j]; + if (k != -1) throughputs[j] = (18 * t + throughputs[i] + s) / 20; + s = t; + } + k = j; + j = i; } - long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase. + long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase. + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration. + if (choice == j && choice + 1 < throughputs.length) + target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length)); targetInflight.set(max(minInflight, min(maxInflight, target))); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index c271ac356e9..d5eab8e17af 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; @@ -57,10 +58,9 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; + LongSupplier nanoClock = System::nanoTime; - - public FeedClientBuilderImpl() { - } + public FeedClientBuilderImpl() { } FeedClientBuilderImpl(List<URI> endpoints) { this(); @@ -252,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } + FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) { + this.nanoClock = requireNonNull(nanoClock); + return this; + } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index 1ea2089c0eb..cec7106403e 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -12,7 +12,6 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; @@ -24,22 +23,22 @@ import static java.util.logging.Level.WARNING; public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); - private static final long NEVER = 1L << 60; - private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); private final AtomicBoolean halfOpen = new AtomicBoolean(false); private final AtomicBoolean open = new AtomicBoolean(false); - private final LongSupplier clock; + private final LongSupplier nanoClock; + private final long never; + private final AtomicLong failingSinceNanos; private final AtomicReference<String> detail = new AtomicReference<>(); - private final long graceMillis; - private final long doomMillis; + private final long graceNanos; + private final long doomNanos; /** * Creates a new circuit breaker with the given grace periods. * @param grace the period of consecutive failures before state changes to half-open. */ public GracePeriodCircuitBreaker(Duration grace) { - this(System::currentTimeMillis, grace, null); + this(System::nanoTime, grace, null); } /** @@ -48,23 +47,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { * @param doom the period of consecutive failures before shutting down. */ public GracePeriodCircuitBreaker(Duration grace, Duration doom) { - this(System::currentTimeMillis, grace, doom); + this(System::nanoTime, grace, doom); if (doom.isNegative()) throw new IllegalArgumentException("Doom delay must be non-negative"); } - GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { + GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) { if (grace.isNegative()) throw new IllegalArgumentException("Grace delay must be non-negative"); - this.clock = requireNonNull(clock); - this.graceMillis = grace.toMillis(); - this.doomMillis = doom == null ? -1 : doom.toMillis(); + this.nanoClock = requireNonNull(nanoClock); + this.never = nanoClock.getAsLong() + (1L << 60); + this.graceNanos = grace.toNanos(); + this.doomNanos = doom == null ? -1 : doom.toNanos(); + this.failingSinceNanos = new AtomicLong(never); } @Override public void success() { - failingSinceMillis.set(NEVER); + failingSinceNanos.set(never); if ( ! open.get() && halfOpen.compareAndSet(true, false)) log.log(INFO, "Circuit breaker is now closed, after a request was successful"); } @@ -80,21 +81,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { } private void failure(String detail) { - if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong())) + if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong())) this.detail.set(detail); } @Override public State state() { - long failingMillis = clock.getAsLong() - failingSinceMillis.get(); - if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) + long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get(); + if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true)) log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " + - "last " + failingMillis + "ms. The server will be pinged to see if it recovers" + - (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") + + "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" + + (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") + ". First failure was '" + detail.get() + "'."); - if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true)) - log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " + + if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " + "and this client will give up and abort its remaining feed operations."); return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index a30cfd5ec39..8ee281fb38d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,13 +17,13 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; +import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.OperationParameters.empty; @@ -45,6 +46,7 @@ import static java.util.Objects.requireNonNull; */ class HttpFeedClient implements FeedClient { + private static final Duration maxTimeout = Duration.ofMinutes(15); private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder() .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()) .build(); @@ -53,20 +55,23 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); private final boolean speedTest; + private final LongSupplier nanoClock; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); + this(builder, + builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { - this(builder, cluster, new HttpRequestStrategy(builder, cluster)); + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, cluster); + this.nanoClock = builder.nanoClock; + verifyConnection(builder, clusterFactory); } @Override @@ -108,10 +113,12 @@ class HttpFeedClient implements FeedClient { throw new IllegalStateException("Client is closed"); HttpRequest request = new HttpRequest(method, - getPath(documentId) + getQuery(params, speedTest), + getPath(documentId), + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? - params.timeout().orElse(null)); + params.timeout().orElse(maxTimeout), + nanoClock); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -129,14 +136,16 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { + private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { Instant start = Instant.now(); - try { + try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", - getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), + getPath(DocumentId.of("feeder", "handshake", "dummy")), + getQuery(empty(), true), requestHeaders, null, - Duration.ofSeconds(15)); + Duration.ofSeconds(15), + nanoClock); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); @@ -219,13 +228,13 @@ class HttpFeedClient implements FeedClient { throw new ResultParseException(documentId, "Expected 'trace' to be an array, but got '" + parser.currentToken() + "' in: " + new String(json, UTF_8)); - int start = (int) parser.getTokenLocation().getByteOffset(); + int start = (int) parser.currentTokenLocation().getByteOffset(); int depth = 1; while (depth > 0) switch (parser.nextToken()) { case START_ARRAY: ++depth; break; case END_ARRAY: --depth; break; } - int end = (int) parser.getTokenLocation().getByteOffset() + 1; + int end = (int) parser.currentTokenLocation().getByteOffset() + 1; trace = new String(json, start, end - start, UTF_8); break; default: @@ -307,11 +316,17 @@ class HttpFeedClient implements FeedClient { StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); if (params.createIfNonExistent()) query.add("create=true"); params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); - params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); params.route().ifPresent(route -> query.add("route=" + encode(route))); params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); if (speedTest) query.add("dryRun=true"); return query.toString(); } + /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ + interface ClusterFactory { + + Cluster create() throws IOException; + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 6de3f034f22..22f6eaa75a4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -3,30 +3,37 @@ package ai.vespa.feed.client.impl; import java.time.Duration; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; class HttpRequest { private final String method; private final String path; + private final String query; private final Map<String, Supplier<String>> headers; private final byte[] body; private final Duration timeout; + private final long deadlineNanos; + private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) { + public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { this.method = method; this.path = path; + this.query = query; this.headers = headers; this.body = body; + this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); this.timeout = timeout; + this.nanoClock = nanoClock; } public String method() { return method; } - public String path() { - return path; + public String pathAndQuery() { + return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms"; } public Map<String, Supplier<String>> headers() { @@ -37,6 +44,10 @@ class HttpRequest { return body; } + public Duration timeLeft() { + return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong()); + } + public Duration timeout() { return timeout; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 8f0327a1738..dc902297d6d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,19 +8,25 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,6 +37,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; /** @@ -65,10 +73,14 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); + // TODO jonmv: remove if this has no effect + private final ResettableCluster resettableCluster; + private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { this.throttler = new DynamicThrottler(builder); - this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; + this.resettableCluster = new ResettableCluster(clusterFactory); + this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -91,12 +103,18 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); + + if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) + resettableCluster.reset(); + else if (breaker.state() == CLOSED) + reset.set(false); + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } } catch (Throwable t) { - log.log(WARNING, "Dispatch thread threw; shutting down", t); + log.log(SEVERE, "Dispatch thread threw; shutting down", t); } destroy(); } @@ -119,7 +137,7 @@ class HttpRequestStrategy implements RequestStrategy { } private boolean retry(HttpRequest request, int attempt) { - if (attempt > strategy.retries()) + if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0) return false; switch (request.method().toUpperCase()) { @@ -137,9 +155,8 @@ class HttpRequestStrategy implements RequestStrategy { private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(thrown); if ( (thrown instanceof IOException) // General IO problems. - // Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server - || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed")) + || (thrown instanceof IllegalStateException && "session closed".equals(thrown.getMessage())) ) { log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); return retry(request, attempt); @@ -149,7 +166,7 @@ class HttpRequestStrategy implements RequestStrategy { return false; } - /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */ + /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */ private boolean retry(HttpRequest request, HttpResponse response, int attempt) { if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) { logResponse(FINEST, response, request, attempt); @@ -170,6 +187,10 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } + if (response.code() >= 500) { // Server errors may indicate something wrong with the server. + breaker.failure(response); + } + return false; } @@ -272,7 +293,8 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, + RetriableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { result.set(response, thrown); // Retry the operation if it failed with a transient error ... @@ -305,4 +327,72 @@ class HttpRequestStrategy implements RequestStrategy { } } + /** + * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes + * renders a client instance permanently unusable. If this is the case, replacing the client altogether + * should allow the feeder to start working again, when it wouldn't otherwise be able to. + */ + private static class ResettableCluster implements Cluster { + + private final Object monitor = new Object(); + private final ClusterFactory clusterFactory; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private AtomicLong inflight = new AtomicLong(0); + private Cluster delegate; + + ResettableCluster(ClusterFactory clusterFactory) throws IOException { + this.clusterFactory = clusterFactory; + this.delegate = clusterFactory.create(); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + synchronized (monitor) { + AtomicLong usedCounter = inflight; + usedCounter.incrementAndGet(); + Cluster usedCluster = delegate; + usedCluster.dispatch(request, vessel); + vessel.whenCompleteAsync((__, ___) -> { + synchronized (monitor) { + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) { + log.log(INFO, "Closing old HTTP client"); + usedCluster.close(); + } + } + }, + executor); + } + } + + @Override + public void close() { + synchronized (monitor) { + delegate.close(); + executor.shutdown(); + try { + if ( ! executor.awaitTermination(1, TimeUnit.MINUTES)) + log.log(WARNING, "Failed shutting down HTTP client within 1 minute"); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for HTTP client to shut down"); + Thread.currentThread().interrupt(); + } + } + } + + @Override + public OperationStats stats() { + return delegate.stats(); + } + + void reset() throws IOException { + synchronized (monitor) { + log.log(INFO, "Replacing underlying HTTP client to attempt recovery"); + delegate = clusterFactory.create(); + inflight = new AtomicLong(0); + } + } + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index df010a167f6..28e5b5d0a21 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -45,8 +45,11 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; @@ -61,6 +64,7 @@ import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; * @author bjorncs */ class JettyCluster implements Cluster { + private static final Logger log = Logger.getLogger(JettyCluster.class.getName()); // Socket timeout must be longer than the longest feasible response timeout private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15); @@ -81,9 +85,12 @@ class JettyCluster implements Cluster { Endpoint endpoint = findLeastBusyEndpoint(endpoints); try { endpoint.inflight.incrementAndGet(); - long reqTimeoutMillis = req.timeout() != null - ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); - Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) + long reqTimeoutMillis = req.timeLeft().toMillis(); + if (reqTimeoutMillis <= 0) { + vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'")); + return; + } + Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.pathAndQuery())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) @@ -104,15 +111,23 @@ class JettyCluster implements Cluster { } jettyReq.body(new BytesRequestContent(APPLICATION_JSON.asString(), bytes)); } + log.log(Level.FINER, () -> + String.format("Dispatching request %s (%s)", req, System.identityHashCode(vessel))); jettyReq.send(new BufferingResponseListener() { @Override public void onComplete(Result result) { + log.log(Level.FINER, () -> + String.format("Completed request %s (%s): %s", + req, System.identityHashCode(vessel), + result.isFailed() + ? result.getFailure().toString() : result.getResponse().getStatus())); endpoint.inflight.decrementAndGet(); if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); else vessel.complete(new JettyResponse(result.getResponse(), getContent())); } }); } catch (Exception e) { + log.log(Level.FINE, e, () -> "Failed to dispatch request: " + e.getMessage()); endpoint.inflight.decrementAndGet(); vessel.completeExceptionally(e); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 9010b0a7ad8..f0ee434e87c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } |