diff options
Diffstat (limited to 'vespa-feed-client')
13 files changed, 399 insertions, 140 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java index 99c891696f5..bd1309d3bca 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java @@ -48,9 +48,7 @@ public class BenchmarkingCluster implements Cluster { public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { requests.incrementAndGet(); long startNanos = System.nanoTime(); - if (timeOfFirstDispatch.get() == 0) { - timeOfFirstDispatch.set(startNanos); - } + timeOfFirstDispatch.compareAndSet(0, startNanos); delegate.dispatch(request, vessel); vessel.whenCompleteAsync((response, thrown) -> { results++; @@ -94,8 +92,8 @@ public class BenchmarkingCluster implements Cluster { if (responsesByCode[code] > 0) responses.put(code, responsesByCode[code]); - double duration = (System.nanoTime() - timeOfFirstDispatch.get())*1e-9; - return new OperationStats(duration, requests, responses, exceptions, + double duration = (System.nanoTime() - timeOfFirstDispatch.get()) * 1e-9; + return new OperationStats(duration, requests, responses, exceptions, requests - results, throttler.targetInflight(), this.responses == 0 ? -1 : totalLatencyMillis / this.responses, this.responses == 0 ? -1 : minLatencyMillis, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 951a1776b6f..3344a372734 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,13 +28,13 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(8 * minInflight); + targetInflight = new AtomicLong(minInflight); } @Override public void sent(long __, CompletableFuture<HttpResponse> ___) { - double currentInflight = targetInflight.get(); - if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) + double currentInflight = targetInflight(); + if (++sent * sent * sent < 1e3 * currentInflight * currentInflight) return; sent = 0; @@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler { // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight))) - / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + / log(256)); // 512 (server max streams per connection) / 2 (our min per connection) throughputs[index] = currentThroughput; // Loop over throughput measurements and pick the one which optimises throughput and latency. - double choice = currentInflight; + double best = currentInflight; double max = -1; - for (int i = throughputs.length; i-- > 0; ) { + int j = -1, k = -1, choice = 0; + double s = 0; + for (int i = 0; i < throughputs.length; i++) { if (throughputs[i] == 0) continue; // Skip unknown values. double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length); double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight). if (objective > max) { max = objective; - choice = inflight; + best = inflight; + choice = i; } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness. + if (j != -1) { + double t = throughputs[j]; + if (k != -1) throughputs[j] = (18 * t + throughputs[i] + s) / 20; + s = t; + } + k = j; + j = i; } - long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase. + long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase. + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration. + if (choice == j && choice + 1 < throughputs.length) + target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length)); targetInflight.set(max(minInflight, min(maxInflight, target))); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index c271ac356e9..d5eab8e17af 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; @@ -57,10 +58,9 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { Compression compression = auto; URI proxy; Duration connectionTtl = Duration.ZERO; + LongSupplier nanoClock = System::nanoTime; - - public FeedClientBuilderImpl() { - } + public FeedClientBuilderImpl() { } FeedClientBuilderImpl(List<URI> endpoints) { this(); @@ -252,6 +252,11 @@ public class FeedClientBuilderImpl implements FeedClientBuilder { return this; } + FeedClientBuilderImpl setNanoClock(LongSupplier nanoClock) { + this.nanoClock = requireNonNull(nanoClock); + return this; + } + /** Constructs instance of {@link ai.vespa.feed.client.FeedClient} from builder configuration */ @Override public FeedClient build() { diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index 1ea2089c0eb..cec7106403e 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -12,7 +12,6 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; @@ -24,22 +23,22 @@ import static java.util.logging.Level.WARNING; public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); - private static final long NEVER = 1L << 60; - private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); private final AtomicBoolean halfOpen = new AtomicBoolean(false); private final AtomicBoolean open = new AtomicBoolean(false); - private final LongSupplier clock; + private final LongSupplier nanoClock; + private final long never; + private final AtomicLong failingSinceNanos; private final AtomicReference<String> detail = new AtomicReference<>(); - private final long graceMillis; - private final long doomMillis; + private final long graceNanos; + private final long doomNanos; /** * Creates a new circuit breaker with the given grace periods. * @param grace the period of consecutive failures before state changes to half-open. */ public GracePeriodCircuitBreaker(Duration grace) { - this(System::currentTimeMillis, grace, null); + this(System::nanoTime, grace, null); } /** @@ -48,23 +47,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { * @param doom the period of consecutive failures before shutting down. */ public GracePeriodCircuitBreaker(Duration grace, Duration doom) { - this(System::currentTimeMillis, grace, doom); + this(System::nanoTime, grace, doom); if (doom.isNegative()) throw new IllegalArgumentException("Doom delay must be non-negative"); } - GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { + GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) { if (grace.isNegative()) throw new IllegalArgumentException("Grace delay must be non-negative"); - this.clock = requireNonNull(clock); - this.graceMillis = grace.toMillis(); - this.doomMillis = doom == null ? -1 : doom.toMillis(); + this.nanoClock = requireNonNull(nanoClock); + this.never = nanoClock.getAsLong() + (1L << 60); + this.graceNanos = grace.toNanos(); + this.doomNanos = doom == null ? -1 : doom.toNanos(); + this.failingSinceNanos = new AtomicLong(never); } @Override public void success() { - failingSinceMillis.set(NEVER); + failingSinceNanos.set(never); if ( ! open.get() && halfOpen.compareAndSet(true, false)) log.log(INFO, "Circuit breaker is now closed, after a request was successful"); } @@ -80,21 +81,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { } private void failure(String detail) { - if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong())) + if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong())) this.detail.set(detail); } @Override public State state() { - long failingMillis = clock.getAsLong() - failingSinceMillis.get(); - if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) + long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get(); + if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true)) log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " + - "last " + failingMillis + "ms. The server will be pinged to see if it recovers" + - (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") + + "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" + + (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") + ". First failure was '" + detail.get() + "'."); - if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true)) - log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " + + if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " + "and this client will give up and abort its remaining feed operations."); return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index a30cfd5ec39..8ee281fb38d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -17,13 +17,13 @@ import com.fasterxml.jackson.core.JsonToken; import com.fasterxml.jackson.core.StreamReadConstraints; import java.io.IOException; +import java.io.UncheckedIOException; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.time.Duration; import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static ai.vespa.feed.client.OperationParameters.empty; @@ -45,6 +46,7 @@ import static java.util.Objects.requireNonNull; */ class HttpFeedClient implements FeedClient { + private static final Duration maxTimeout = Duration.ofMinutes(15); private static final JsonFactory jsonParserFactory = new JsonFactoryBuilder() .streamReadConstraints(StreamReadConstraints.builder().maxStringLength(Integer.MAX_VALUE).build()) .build(); @@ -53,20 +55,23 @@ class HttpFeedClient implements FeedClient { private final RequestStrategy requestStrategy; private final AtomicBoolean closed = new AtomicBoolean(); private final boolean speedTest; + private final LongSupplier nanoClock; HttpFeedClient(FeedClientBuilderImpl builder) throws IOException { - this(builder, builder.dryrun ? new DryrunCluster() : new JettyCluster(builder)); + this(builder, + builder.dryrun ? () -> new DryrunCluster() : () -> new JettyCluster(builder)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster) { - this(builder, cluster, new HttpRequestStrategy(builder, cluster)); + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { + this(builder, clusterFactory, new HttpRequestStrategy(builder, clusterFactory)); } - HttpFeedClient(FeedClientBuilderImpl builder, Cluster cluster, RequestStrategy requestStrategy) { + HttpFeedClient(FeedClientBuilderImpl builder, ClusterFactory clusterFactory, RequestStrategy requestStrategy) throws IOException { this.requestHeaders = new HashMap<>(builder.requestHeaders); this.requestStrategy = requestStrategy; this.speedTest = builder.speedTest; - verifyConnection(builder, cluster); + this.nanoClock = builder.nanoClock; + verifyConnection(builder, clusterFactory); } @Override @@ -108,10 +113,12 @@ class HttpFeedClient implements FeedClient { throw new IllegalStateException("Client is closed"); HttpRequest request = new HttpRequest(method, - getPath(documentId) + getQuery(params, speedTest), + getPath(documentId), + getQuery(params, speedTest), requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? - params.timeout().orElse(null)); + params.timeout().orElse(maxTimeout), + nanoClock); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -129,14 +136,16 @@ class HttpFeedClient implements FeedClient { return promise; } - private void verifyConnection(FeedClientBuilderImpl builder, Cluster cluster) { + private void verifyConnection(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { Instant start = Instant.now(); - try { + try (Cluster cluster = clusterFactory.create()) { HttpRequest request = new HttpRequest("POST", - getPath(DocumentId.of("feeder", "handshake", "dummy")) + getQuery(empty(), true), + getPath(DocumentId.of("feeder", "handshake", "dummy")), + getQuery(empty(), true), requestHeaders, null, - Duration.ofSeconds(15)); + Duration.ofSeconds(15), + nanoClock); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); @@ -219,13 +228,13 @@ class HttpFeedClient implements FeedClient { throw new ResultParseException(documentId, "Expected 'trace' to be an array, but got '" + parser.currentToken() + "' in: " + new String(json, UTF_8)); - int start = (int) parser.getTokenLocation().getByteOffset(); + int start = (int) parser.currentTokenLocation().getByteOffset(); int depth = 1; while (depth > 0) switch (parser.nextToken()) { case START_ARRAY: ++depth; break; case END_ARRAY: --depth; break; } - int end = (int) parser.getTokenLocation().getByteOffset() + 1; + int end = (int) parser.currentTokenLocation().getByteOffset() + 1; trace = new String(json, start, end - start, UTF_8); break; default: @@ -307,11 +316,17 @@ class HttpFeedClient implements FeedClient { StringJoiner query = new StringJoiner("&", "?", "").setEmptyValue(""); if (params.createIfNonExistent()) query.add("create=true"); params.testAndSetCondition().ifPresent(condition -> query.add("condition=" + encode(condition))); - params.timeout().ifPresent(timeout -> query.add("timeout=" + timeout.toMillis() + "ms")); params.route().ifPresent(route -> query.add("route=" + encode(route))); params.tracelevel().ifPresent(tracelevel -> query.add("tracelevel=" + tracelevel)); if (speedTest) query.add("dryRun=true"); return query.toString(); } + /** Factory for creating a new {@link Cluster} to dispatch operations to. Used for resetting the active cluster. */ + interface ClusterFactory { + + Cluster create() throws IOException; + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index 6de3f034f22..22f6eaa75a4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -3,30 +3,37 @@ package ai.vespa.feed.client.impl; import java.time.Duration; import java.util.Map; +import java.util.function.LongSupplier; import java.util.function.Supplier; class HttpRequest { private final String method; private final String path; + private final String query; private final Map<String, Supplier<String>> headers; private final byte[] body; private final Duration timeout; + private final long deadlineNanos; + private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout) { + public HttpRequest(String method, String path, String query, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { this.method = method; this.path = path; + this.query = query; this.headers = headers; this.body = body; + this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); this.timeout = timeout; + this.nanoClock = nanoClock; } public String method() { return method; } - public String path() { - return path; + public String pathAndQuery() { + return path + (query.isEmpty() ? "?" : query + "&") + "timeout=" + Math.max(1, timeLeft().toMillis()) + "ms"; } public Map<String, Supplier<String>> headers() { @@ -37,6 +44,10 @@ class HttpRequest { return body; } + public Duration timeLeft() { + return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong()); + } + public Duration timeout() { return timeout; } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index 8f0327a1738..dc902297d6d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -8,19 +8,25 @@ import ai.vespa.feed.client.FeedClient.RetryStrategy; import ai.vespa.feed.client.FeedException; import ai.vespa.feed.client.HttpResponse; import ai.vespa.feed.client.OperationStats; +import ai.vespa.feed.client.impl.HttpFeedClient.ClusterFactory; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,6 +37,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.FINER; import static java.util.logging.Level.FINEST; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.SEVERE; import static java.util.logging.Level.WARNING; /** @@ -65,10 +73,14 @@ class HttpRequestStrategy implements RequestStrategy { thread.setDaemon(true); return thread; }); + // TODO jonmv: remove if this has no effect + private final ResettableCluster resettableCluster; + private final AtomicBoolean reset = new AtomicBoolean(false); - HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { + HttpRequestStrategy(FeedClientBuilderImpl builder, ClusterFactory clusterFactory) throws IOException { this.throttler = new DynamicThrottler(builder); - this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; + this.resettableCluster = new ResettableCluster(clusterFactory); + this.cluster = builder.benchmark ? new BenchmarkingCluster(resettableCluster, throttler) : resettableCluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; @@ -91,12 +103,18 @@ class HttpRequestStrategy implements RequestStrategy { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); + + if (breaker.state() == HALF_OPEN && reset.compareAndSet(false, true)) + resettableCluster.reset(); + else if (breaker.state() == CLOSED) + reset.set(false); + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } } catch (Throwable t) { - log.log(WARNING, "Dispatch thread threw; shutting down", t); + log.log(SEVERE, "Dispatch thread threw; shutting down", t); } destroy(); } @@ -119,7 +137,7 @@ class HttpRequestStrategy implements RequestStrategy { } private boolean retry(HttpRequest request, int attempt) { - if (attempt > strategy.retries()) + if (attempt > strategy.retries() || request.timeLeft().toMillis() <= 0) return false; switch (request.method().toUpperCase()) { @@ -137,9 +155,8 @@ class HttpRequestStrategy implements RequestStrategy { private boolean retry(HttpRequest request, Throwable thrown, int attempt) { breaker.failure(thrown); if ( (thrown instanceof IOException) // General IO problems. - // Thrown by HTTP2Session.StreamsState.reserveSlot, likely on GOAWAY from server - || (thrown instanceof IllegalStateException && thrown.getMessage().equals("session closed")) + || (thrown instanceof IllegalStateException && "session closed".equals(thrown.getMessage())) ) { log.log(FINER, thrown, () -> "Failed attempt " + attempt + " at " + request); return retry(request, attempt); @@ -149,7 +166,7 @@ class HttpRequestStrategy implements RequestStrategy { return false; } - /** Retries throttled requests (429), adjusting the target inflight count, and server errors (500, 502, 503, 504). */ + /** Retries throttled requests (429), adjusting the target inflight count, and server unavailable (503). */ private boolean retry(HttpRequest request, HttpResponse response, int attempt) { if (response.code() / 100 == 2 || response.code() == 404 || response.code() == 412) { logResponse(FINEST, response, request, attempt); @@ -170,6 +187,10 @@ class HttpRequestStrategy implements RequestStrategy { return retry(request, attempt); } + if (response.code() >= 500) { // Server errors may indicate something wrong with the server. + breaker.failure(response); + } + return false; } @@ -272,7 +293,8 @@ class HttpRequestStrategy implements RequestStrategy { } /** Handles the result of one attempt at the given operation, retrying if necessary. */ - private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, RetriableFuture<HttpResponse> result, int attempt) { + private void handleAttempt(CompletableFuture<HttpResponse> vessel, HttpRequest request, + RetriableFuture<HttpResponse> result, int attempt) { vessel.whenCompleteAsync((response, thrown) -> { result.set(response, thrown); // Retry the operation if it failed with a transient error ... @@ -305,4 +327,72 @@ class HttpRequestStrategy implements RequestStrategy { } } + /** + * Oof, this is an attempt to see if there's a terminal bug in the Jetty client library that sometimes + * renders a client instance permanently unusable. If this is the case, replacing the client altogether + * should allow the feeder to start working again, when it wouldn't otherwise be able to. + */ + private static class ResettableCluster implements Cluster { + + private final Object monitor = new Object(); + private final ClusterFactory clusterFactory; + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + private AtomicLong inflight = new AtomicLong(0); + private Cluster delegate; + + ResettableCluster(ClusterFactory clusterFactory) throws IOException { + this.clusterFactory = clusterFactory; + this.delegate = clusterFactory.create(); + } + + @Override + public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { + synchronized (monitor) { + AtomicLong usedCounter = inflight; + usedCounter.incrementAndGet(); + Cluster usedCluster = delegate; + usedCluster.dispatch(request, vessel); + vessel.whenCompleteAsync((__, ___) -> { + synchronized (monitor) { + if (usedCounter.decrementAndGet() == 0 && usedCluster != delegate) { + log.log(INFO, "Closing old HTTP client"); + usedCluster.close(); + } + } + }, + executor); + } + } + + @Override + public void close() { + synchronized (monitor) { + delegate.close(); + executor.shutdown(); + try { + if ( ! executor.awaitTermination(1, TimeUnit.MINUTES)) + log.log(WARNING, "Failed shutting down HTTP client within 1 minute"); + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for HTTP client to shut down"); + Thread.currentThread().interrupt(); + } + } + } + + @Override + public OperationStats stats() { + return delegate.stats(); + } + + void reset() throws IOException { + synchronized (monitor) { + log.log(INFO, "Replacing underlying HTTP client to attempt recovery"); + delegate = clusterFactory.create(); + inflight = new AtomicLong(0); + } + } + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index df010a167f6..28e5b5d0a21 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -45,8 +45,11 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; @@ -61,6 +64,7 @@ import static org.eclipse.jetty.http.MimeTypes.Type.APPLICATION_JSON; * @author bjorncs */ class JettyCluster implements Cluster { + private static final Logger log = Logger.getLogger(JettyCluster.class.getName()); // Socket timeout must be longer than the longest feasible response timeout private static final Duration IDLE_TIMEOUT = Duration.ofMinutes(15); @@ -81,9 +85,12 @@ class JettyCluster implements Cluster { Endpoint endpoint = findLeastBusyEndpoint(endpoints); try { endpoint.inflight.incrementAndGet(); - long reqTimeoutMillis = req.timeout() != null - ? req.timeout().toMillis() * 11 / 10 + 1000 : IDLE_TIMEOUT.toMillis(); - Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) + long reqTimeoutMillis = req.timeLeft().toMillis(); + if (reqTimeoutMillis <= 0) { + vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'")); + return; + } + Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.pathAndQuery())) .version(HttpVersion.HTTP_2) .method(HttpMethod.fromString(req.method())) .headers(hs -> req.headers().forEach((k, v) -> hs.add(k, v.get()))) @@ -104,15 +111,23 @@ class JettyCluster implements Cluster { } jettyReq.body(new BytesRequestContent(APPLICATION_JSON.asString(), bytes)); } + log.log(Level.FINER, () -> + String.format("Dispatching request %s (%s)", req, System.identityHashCode(vessel))); jettyReq.send(new BufferingResponseListener() { @Override public void onComplete(Result result) { + log.log(Level.FINER, () -> + String.format("Completed request %s (%s): %s", + req, System.identityHashCode(vessel), + result.isFailed() + ? result.getFailure().toString() : result.getResponse().getStatus())); endpoint.inflight.decrementAndGet(); if (result.isFailed()) vessel.completeExceptionally(result.getFailure()); else vessel.complete(new JettyResponse(result.getResponse(), getContent())); } }); } catch (Exception e) { + log.log(Level.FINE, e, () -> "Failed to dispatch request: " + e.getMessage()); endpoint.inflight.decrementAndGet(); vessel.completeExceptionally(e); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 9010b0a7ad8..f0ee434e87c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java new file mode 100644 index 00000000000..cea5d32a55a --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java @@ -0,0 +1,30 @@ +package ai.vespa.feed.client.impl; + +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class DynamicThrottlerTest { + + @Test + void testThrottler() { + DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080")))); + assertEquals(16, throttler.targetInflight()); + + for (int i = 0; i < 65; i++) { + throttler.sent(1, null); + throttler.success(); + } + assertEquals(18, throttler.targetInflight()); + + throttler.throttled(34); + assertEquals(17, throttler.targetInflight()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java index f5ca70fe291..52b8dcc5884 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java @@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest { @Test void testCircuitBreaker() { - AtomicLong now = new AtomicLong(0); - long SECOND = 1000; - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + AtomicLong nowNanos = new AtomicLong(0); + long SECOND = 1_000_000_000L; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); Throwable error = new Error(); assertEquals(CLOSED, breaker.state(), "Initial state is closed"); - now.addAndGet(100 * SECOND); + nowNanos.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a success"); - now.addAndGet(100 * SECOND); + nowNanos.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); breaker.failure(error); assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); - now.addAndGet(SECOND); + nowNanos.addAndGet(SECOND); assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); - now.addAndGet(1); + nowNanos.addAndGet(1); assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); breaker.failure(error); - now.addAndGet(60 * SECOND); + nowNanos.addAndGet(60 * SECOND); assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); - now.addAndGet(1); + nowNanos.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); breaker.success(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java index 28bde16f457..9afaeed8062 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpFeedClientTest.java @@ -11,6 +11,7 @@ import ai.vespa.feed.client.Result; import ai.vespa.feed.client.ResultException; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.net.URI; import java.time.Duration; import java.util.List; @@ -32,7 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class HttpFeedClientTest { @Test - void testFeeding() throws ExecutionException, InterruptedException { + void testFeeding() throws ExecutionException, InterruptedException, IOException { DocumentId id = DocumentId.of("ns", "type", "0"); AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @@ -42,16 +43,18 @@ class HttpFeedClientTest { @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } } - FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setDryrun(true), - new DryrunCluster(), + FeedClient client = new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setDryrun(true) + .setNanoClock(() -> 0), + () -> new DryrunCluster(), new MockRequestStrategy()); // Update is a PUT, and 200 OK is a success. dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", + request.pathAndQuery()); assertEquals("PUT", request.method()); assertEquals("json", new String(request.body(), UTF_8)); @@ -82,8 +85,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?tracelevel=1", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?tracelevel=1&timeout=900000ms", + request.pathAndQuery()); assertEquals("DELETE", request.method()); assertNull(request.body()); @@ -144,8 +147,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&timeout=5000ms&route=route", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?create=true&condition=false&route=route&timeout=5000ms", + request.pathAndQuery()); assertEquals("json", new String(request.body(), UTF_8)); HttpResponse response = HttpResponse.of(502, @@ -174,7 +177,7 @@ class HttpFeedClientTest { .timeout(Duration.ofSeconds(5))) .get()); assertTrue(expected.getCause() instanceof ResultException); - assertEquals("Ooops! ... I did it again.", expected.getCause().getMessage()); + assertEquals("(id:ns:type::0) Ooops! ... I did it again.", expected.getCause().getMessage()); assertEquals("[ { \"message\": \"I played with your heart. Got lost in the game.\" } ]", ((ResultException) expected.getCause()).getTrace().get()); @@ -182,8 +185,8 @@ class HttpFeedClientTest { dispatch.set((documentId, request) -> { try { assertEquals(id, documentId); - assertEquals("/document/v1/ns/type/docid/0", - request.path()); + assertEquals("/document/v1/ns/type/docid/0?timeout=900000ms", + request.pathAndQuery()); assertEquals("json", new String(request.body(), UTF_8)); HttpResponse response = HttpResponse.of(500, @@ -207,14 +210,14 @@ class HttpFeedClientTest { "json", OperationParameters.empty()) .get()); - assertEquals("Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); + assertEquals("(id:ns:type::0) Status 500 executing 'POST /document/v1/ns/type/docid/0': Alla ska i jorden.", expected.getCause().getMessage()); } @Test - void testHandshake() { + void testHandshake() throws IOException { // dummy:123 does not exist, and results in a host-not-found exception. FeedException exception = assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))))); String message = exception.getMessage(); assertTrue(message.startsWith("failed handshake with server after "), message); assertTrue(message.contains("java.net.UnknownHostException"), message); @@ -226,7 +229,7 @@ class HttpFeedClientTest { try { assertNull(request.body()); assertEquals("POST", request.method()); - assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true", request.path()); + assertEquals("/document/v1/feeder/handshake/docid/dummy?dryRun=true&timeout=15000ms", request.pathAndQuery()); vessel.complete(response.get()); } catch (Throwable t) { @@ -237,20 +240,24 @@ class HttpFeedClientTest { // Old server, and speed-test. assertEquals("server does not support speed test; upgrade to a newer version", assertThrows(FeedException.class, - () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))).setSpeedTest(true), - cluster, + () -> new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0) + .setSpeedTest(true), + () -> cluster, null)) .getMessage()); // Old server. - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), + () -> cluster, null); // New server. response.set(okResponse); - new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))), - cluster, + new HttpFeedClient(new FeedClientBuilderImpl(List.of(URI.create("https://dummy:123"))) + .setNanoClock(() -> 0), + () -> cluster, null); } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 54fab9b859b..35d6433ffc5 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -17,11 +17,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -33,23 +35,24 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class HttpRequestStrategyTest { @Test - void testConcurrency() { + void testConcurrency() throws IOException { int documents = 1 << 16; - HttpRequest request = new HttpRequest("PUT", "/", null, null, null); + HttpRequest request = new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), () -> 0); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); - HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setConnectionsPerEndpoint(1 << 10) - .setMaxStreamPerConnection(1 << 12), - cluster); + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setConnectionsPerEndpoint(1 << 10) + .setMaxStreamPerConnection(1 << 12), + () -> cluster); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { try { @@ -80,37 +83,40 @@ class HttpRequestStrategyTest { assertEquals(2 * documents, stats.bytesReceived()); } - @Test - void testRetries() throws ExecutionException, InterruptedException { - int minStreams = 16; // Hard limit for minimum number of streams per connection. + @Test() + void testRetries() throws ExecutionException, InterruptedException, IOException { + int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); - AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong nowNanos = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) - .setRetryStrategy(new FeedClient.RetryStrategy() { - @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } - @Override public int retries() { return 1; } - }) - .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1) - .setMaxStreamPerConnection(minStreams), - cluster); + .setRetryStrategy(new FeedClient.RetryStrategy() { + @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } + @Override public int retries() { return 1; } + }) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1) + .setMaxStreamPerConnection(minStreams), + () -> cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, null); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(180), nowNanos::get); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertTrue(expected.getCause() instanceof FeedException); - assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); + assertInstanceOf(FeedException.class, expected.getCause()); + assertEquals("(id:ns:type::1) java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); // IOException is retried. - cluster.expect((__, vessel) -> vessel.completeExceptionally(new IOException("retry me"))); + cluster.expect((__, vessel) -> { + nowNanos.addAndGet(200_000_000L); // Exceed grace period. + vessel.completeExceptionally(new IOException("retry me")); + }); expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); @@ -123,7 +129,7 @@ class HttpRequestStrategyTest { assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. - now.set(2000); + nowNanos.set(2_000_000_000L); HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); @@ -140,11 +146,11 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, null)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. - now.set(4000); + nowNanos.set(4_000_000_000L); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. completion.get().complete(success); assertEquals(success, delayed.get()); @@ -152,14 +158,17 @@ class HttpRequestStrategyTest { // Some error responses are retried. HttpResponse serverError = HttpResponse.of(503, null); - cluster.expect((__, vessel) -> vessel.complete(serverError)); + cluster.expect((__, vessel) -> { + nowNanos.addAndGet(200_000_000L); // Exceed grace period. + vessel.complete(serverError); + }); assertEquals(serverError, strategy.enqueue(id1, request).get()); assertEquals(11, strategy.stats().requests()); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, null)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -168,10 +177,22 @@ class HttpRequestStrategyTest { assertEquals(badRequest, strategy.enqueue(id1, request).get()); assertEquals(13, strategy.stats().requests()); + + // IOException is not retried past timeout. + cluster.expect((__, vessel) -> { + nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts. + vessel.completeExceptionally(new IOException("retry me")); + }); + expected = assertThrows(ExecutionException.class, + () -> strategy.enqueue(id1, new HttpRequest("POST", "/", "", null, null, Duration.ofMillis(100), nowNanos::get)).get()); + assertEquals("retry me", expected.getCause().getCause().getMessage()); + assertEquals(15, strategy.stats().requests()); + + // Circuit breaker opens some time after starting to fail. - now.set(6000); + nowNanos.set(6_000_000_000L); assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. - now.set(605000); + nowNanos.set(605_000_000_000L); assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. strategy.destroy(); @@ -182,7 +203,7 @@ class HttpRequestStrategyTest { codes.put(429, 2L); codes.put(503, 3L); assertEquals(codes, stats.responsesByCode()); - assertEquals(3, stats.exceptions()); + assertEquals(5, stats.exceptions()); assertEquals(stats, stats.since(initial)); assertEquals(0, stats.since(stats).averageLatencyMillis()); @@ -191,27 +212,73 @@ class HttpRequestStrategyTest { } @Test - void testShutdown() { - MockCluster cluster = new MockCluster(); + void testResettingCluster() throws ExecutionException, InterruptedException, IOException { + List<MockCluster> clusters = List.of(new MockCluster(), new MockCluster()); AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), null); + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) + .setCircuitBreaker(breaker) + .setConnectionsPerEndpoint(1), + clusters.iterator()::next); + + // First operation fails, second remains in flight, and third fails. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + strategy.enqueue(DocumentId.of("ns", "type", "1"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); + Exchanger<CompletableFuture<HttpResponse>> exchanger = new Exchanger<>(); + clusters.get(0).expect((__, vessel) -> { + try { exchanger.exchange(vessel); } catch (InterruptedException e) { throw new RuntimeException(e); } + }); + CompletableFuture<HttpResponse> secondResponse = strategy.enqueue(DocumentId.of("ns", "type", "2"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)); + CompletableFuture<HttpResponse> secondVessel = exchanger.exchange(null); + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + strategy.enqueue(DocumentId.of("ns", "type", "3"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get(); + + // Time advances, and the circuit breaker half-opens. + assertEquals(CLOSED, breaker.state()); + now.addAndGet(2_000_000_000); + assertEquals(HALF_OPEN, breaker.state()); + + // It's indeterminate which cluster gets the next request, but the second should get the next one after that. + clusters.get(0).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(500, null))); + assertEquals(500, strategy.enqueue(DocumentId.of("ns", "type", "4"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + clusters.get(0).expect((__, vessel) -> vessel.completeExceptionally(new AssertionError("should not be called"))); + clusters.get(1).expect((__, vessel) -> vessel.complete(HttpResponse.of(200, null))); + assertEquals(200, strategy.enqueue(DocumentId.of("ns", "type", "5"), new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), now::get)).get().code()); + + assertFalse(clusters.get(0).closed.get()); + assertFalse(clusters.get(1).closed.get()); + secondVessel.complete(HttpResponse.of(504, null)); + assertEquals(504, secondResponse.get().code()); + strategy.await(); + strategy.destroy(); + assertTrue(clusters.get(0).closed.get()); + assertTrue(clusters.get(1).closed.get()); + } + + @Test + void testShutdown() throws IOException { + MockCluster cluster = new MockCluster(); + AtomicLong nowNanos = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), - cluster); + .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. + () -> cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, null); - HttpRequest partial = new HttpRequest("POST", "/", null, null, null); - HttpRequest request = new HttpRequest("POST", "/", null, null, null); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, null); + HttpRequest failing = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest partial = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest blocking = new HttpRequest("POST", "/", "", null, null, Duration.ofSeconds(1), nowNanos::get); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); @@ -242,7 +309,7 @@ class HttpRequestStrategyTest { CompletableFuture<HttpResponse> delayed = strategy.enqueue(id5, request); phaser.arriveAndAwaitAdvance(); // retried is allowed to dispatch, and will be retried async. // failed immediately fails, and lets us assert the above retry is indeed enqueued. - assertEquals("ai.vespa.feed.client.FeedException: java.lang.RuntimeException: fatal", + assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::3) java.lang.RuntimeException: fatal", assertThrows(ExecutionException.class, failed::get).getMessage()); phaser.arriveAndAwaitAdvance(); // blocked starts dispatch, and hangs, blocking dispatch thread. @@ -269,7 +336,7 @@ class HttpRequestStrategyTest { assertThrows(ExecutionException.class, blocked::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, delayed::get).getMessage()); - assertEquals("ai.vespa.feed.client.FeedException: java.io.IOException: failed", + assertEquals("ai.vespa.feed.client.FeedException: (id:ns:type::2) java.io.IOException: failed", assertThrows(ExecutionException.class, retried::get).getMessage()); assertEquals("ai.vespa.feed.client.FeedException: Operation aborted", assertThrows(ExecutionException.class, strategy.enqueue(id1, request)::get).getMessage()); @@ -278,6 +345,7 @@ class HttpRequestStrategyTest { static class MockCluster implements Cluster { final AtomicReference<BiConsumer<HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); + final AtomicBoolean closed = new AtomicBoolean(false); void expect(BiConsumer<HttpRequest, CompletableFuture<HttpResponse>> expected) { dispatch.set(expected); @@ -288,6 +356,11 @@ class HttpRequestStrategyTest { dispatch.get().accept(request, vessel); } + @Override + public void close() { + closed.set(true); + } + } } |