From c571345c4599e837943d42a33bbb22e9debefa14 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Thu, 3 Jun 2021 14:58:26 +0200 Subject: Support mulitple endpoints, circuit breaker, and improve concurrency --- .../main/java/ai/vespa/feed/client/CliClient.java | 2 +- .../main/java/ai/vespa/feed/client/FeedClient.java | 52 ++++- .../ai/vespa/feed/client/FeedClientBuilder.java | 40 ++-- .../feed/client/GracePeriodCircuitBreaker.java | 62 ++++++ .../java/ai/vespa/feed/client/HttpFeedClient.java | 98 ++++++--- .../ai/vespa/feed/client/HttpRequestStrategy.java | 240 ++++++++++----------- .../ai/vespa/feed/client/JsonStreamFeeder.java | 18 +- .../java/ai/vespa/feed/client/RequestStrategy.java | 7 + .../ai/vespa/feed/client/JsonStreamFeederTest.java | 3 +- 9 files changed, 340 insertions(+), 182 deletions(-) create mode 100644 vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java index 2f15f468588..e3f726eaf11 100644 --- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java +++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java @@ -74,7 +74,7 @@ public class CliClient { private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException { FeedClientBuilder builder = FeedClientBuilder.create(cliArgs.endpoint()); - cliArgs.connections().ifPresent(builder::setMaxConnections); + cliArgs.connections().ifPresent(builder::setConnectionsPerEndpoint); cliArgs.maxStreamsPerConnection().ifPresent(builder::setMaxStreamPerConnection); if (cliArgs.sslHostnameVerificationDisabled()) { builder.setHostnameVerifier(AcceptAllHostnameVerifier.INSTANCE); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index 455a79060ee..2ac75a948d9 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -10,10 +10,22 @@ import java.util.concurrent.CompletableFuture; */ public interface FeedClient extends Closeable { + /** Send a document put with the given parameters, returning a future with the result of the operation. */ CompletableFuture put(DocumentId documentId, String documentJson, OperationParameters params); + + /** Send a document update with the given parameters, returning a future with the result of the operation. */ CompletableFuture update(DocumentId documentId, String updateJson, OperationParameters params); + + /** Send a document remove with the given parameters, returning a future with the result of the operation. */ CompletableFuture remove(DocumentId documentId, OperationParameters params); + /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ + void close(boolean graceful); + + /** Initiates graceful shutdown. See {@link #close(boolean)}. */ + default void close() { close(true); } + + /** Controls what to retry, and how many times. */ interface RetryStrategy { /** Whether to retry operations of the given type. */ @@ -24,10 +36,44 @@ public interface FeedClient extends Closeable { } + /** Allows slowing down or halting completely operations against the configured endpoint on high failure rates. */ + interface CircuitBreaker { + + /** Called by the client whenever a successful response is obtained. */ + void success(); + + /** Called by the client whenever a transient or fatal error occurs. */ + void failure(); + + /** The current state of the circuit breaker. */ + State state(); + + enum State { + + /** Circuit is closed: business as usual. */ + CLOSED, + + /** Circuit is half-open: something is wrong, perhaps it recovers? */ + HALF_OPEN, + + /** Circuit is open: we have given up. */ + OPEN; + + } + + } + enum OperationType { - put, - update, - remove; + + /** A document put operation. This is idempotent. */ + PUT, + + /** A document update operation. This is idempotent if all its contained updates are. */ + UPDATE, + + /** A document remove operation. This is idempotent. */ + REMOVE; + } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index eaf84c67ac4..da575a7cf6d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -8,7 +8,11 @@ import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Path; import java.time.Clock; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Supplier; @@ -22,38 +26,45 @@ import static java.util.Objects.requireNonNull; */ public class FeedClientBuilder { - FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; + static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; - final URI endpoint; + final List endpoints; final Map> requestHeaders = new HashMap<>(); SSLContext sslContext; HostnameVerifier hostnameVerifier; - int maxConnections = 4; - int maxStreamsPerConnection = 1024; + int connectionsPerEndpoint = 4; + int maxStreamsPerConnection = 128; FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy; + FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Clock.systemUTC(), Duration.ofSeconds(1), Duration.ofMinutes(10)); Path certificate; Path privateKey; Path caCertificates; - Clock clock; - public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(endpoint); } + public static FeedClientBuilder create(URI endpoint) { return new FeedClientBuilder(Collections.singletonList(endpoint)); } - private FeedClientBuilder(URI endpoint) { - requireNonNull(endpoint.getHost()); - this.endpoint = endpoint; + public static FeedClientBuilder create(List endpoints) { return new FeedClientBuilder(endpoints); } + + private FeedClientBuilder(List endpoints) { + if (endpoints.isEmpty()) + throw new IllegalArgumentException("At least one endpoint must be provided"); + + for (URI endpoint : endpoints) + requireNonNull(endpoint.getHost()); + + this.endpoints = new ArrayList<>(endpoints); } /** - * Sets the maximum number of connections this client will use. + * Sets the number of connections this client will use per endpoint. * * A reasonable value here is a small multiple of the numbers of containers in the * cluster to feed, so load can be balanced across these. * In general, this value should be kept as low as possible, but poor connectivity * between feeder and cluster may also warrant a higher number of connections. */ - public FeedClientBuilder setMaxConnections(int max) { + public FeedClientBuilder setConnectionsPerEndpoint(int max) { if (max < 1) throw new IllegalArgumentException("Max connections must be at least 1, but was " + max); - this.maxConnections = max; + this.connectionsPerEndpoint = max; return this; } @@ -97,6 +108,11 @@ public class FeedClientBuilder { return this; } + public FeedClientBuilder setCircuitBreaker(FeedClient.CircuitBreaker breaker) { + this.circuitBreaker = requireNonNull(breaker); + return this; + } + public FeedClientBuilder setCertificate(Path certificatePemFile, Path privateKeyPemFile) { if (sslContext != null) throw new IllegalArgumentException("Cannot set both SSLContext and certificate"); this.certificate = certificatePemFile; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java new file mode 100644 index 00000000000..974d18418ec --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java @@ -0,0 +1,62 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.time.Clock; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.INFO; +import static java.util.logging.Level.WARNING; + +/** + * Breaks the circuit when no successes have been recorded for a specified time. + */ +public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { + + private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); + + private final AtomicLong lastSuccessMillis = new AtomicLong(0); // Trigger if first response is a failure. + private final AtomicBoolean halfOpen = new AtomicBoolean(false); + private final AtomicBoolean open = new AtomicBoolean(false); + private final Clock clock; + private final long graceMillis; + private final long doomMillis; + + GracePeriodCircuitBreaker(Clock clock, Duration grace, Duration doom) { + if (grace.isNegative()) + throw new IllegalArgumentException("Grace delay must be non-negative"); + + if (doom.isNegative()) + throw new IllegalArgumentException("Doom delay must be non-negative"); + + this.clock = requireNonNull(clock); + this.graceMillis = grace.toMillis(); + this.doomMillis = doom.toMillis(); + } + + @Override + public void success() { + lastSuccessMillis.set(clock.millis()); + if (halfOpen.compareAndSet(true, false)) + log.log(INFO, "Circuit breaker is now closed"); + } + + @Override + public void failure() { + long nowMillis = clock.millis(); + if (lastSuccessMillis.get() < nowMillis - doomMillis && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open"); + + if (lastSuccessMillis.get() < nowMillis - graceMillis && halfOpen.compareAndSet(false, true)) + log.log(INFO, "Circuit breaker is now half-open"); + } + + @Override + public State state() { + return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 8a38e859ca4..644c387acd1 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -11,6 +11,7 @@ import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.net.URIAuthority; import org.apache.hc.core5.net.URIBuilder; import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.util.Timeout; @@ -19,9 +20,9 @@ import javax.net.ssl.SSLContext; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.io.UncheckedIOException; import java.net.URI; import java.net.URISyntaxException; -import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -44,24 +45,32 @@ import static org.apache.hc.core5.http.ssl.TlsCiphers.excludeWeak; */ class HttpFeedClient implements FeedClient { - private final URI endpoint; private final Map> requestHeaders; private final RequestStrategy requestStrategy; - private final List httpClients = new ArrayList<>(); - private final List inflight = new ArrayList<>(); + private final List endpoints = new ArrayList<>(); private final AtomicBoolean closed = new AtomicBoolean(); HttpFeedClient(FeedClientBuilder builder) throws IOException { - this.endpoint = builder.endpoint; this.requestHeaders = new HashMap<>(builder.requestHeaders); - this.requestStrategy = new HttpRequestStrategy(builder, Clock.systemUTC()); + this.requestStrategy = new HttpRequestStrategy(builder); + for (URI endpoint : builder.endpoints) + for (int i = 0; i < builder.connectionsPerEndpoint; i++) + endpoints.add(new Endpoint(createHttpClient(builder), endpoint)); + } + + private static class Endpoint { + + private final CloseableHttpAsyncClient client; + private final AtomicInteger inflight = new AtomicInteger(0); + private final URI url; - for (int i = 0; i < builder.maxConnections; i++) { - CloseableHttpAsyncClient client = createHttpClient(builder); - client.start(); - httpClients.add(client); - inflight.add(new AtomicInteger()); + private Endpoint(CloseableHttpAsyncClient client, URI url) { + this.client = client; + this.url = url; + + this.client.start(); } + } private static CloseableHttpAsyncClient createHttpClient(FeedClientBuilder builder) throws IOException { @@ -129,14 +138,37 @@ class HttpFeedClient implements FeedClient { } @Override - public void close() throws IOException { - if ( ! closed.getAndSet(true)) - for (CloseableHttpAsyncClient hc : httpClients) - hc.close(); + public void close(boolean graceful) { + closed.set(true); + if (graceful) + requestStrategy.await(); + + requestStrategy.destroy(); + Throwable thrown = null; + for (Endpoint endpoint : endpoints) + try { + endpoint.client.close(); + } + catch (Throwable t) { + if (thrown == null) thrown = t; + else thrown.addSuppressed(t); + } + if (thrown != null) throw new RuntimeException(thrown); + } + + private void ensureOpen() { + if (requestStrategy.hasFailed()) + close(); + + if (closed.get()) + throw new IllegalStateException("Client is closed, no further operations may be sent"); } private CompletableFuture send(String method, DocumentId documentId, String operationJson, OperationParameters params) { - SimpleHttpRequest request = new SimpleHttpRequest(method, operationUrl(endpoint, documentId, params)); + ensureOpen(); + + String path = operationPath(documentId, params).toString(); + SimpleHttpRequest request = new SimpleHttpRequest(method, path); requestHeaders.forEach((name, value) -> request.setHeader(name, value.get())); if (operationJson != null) request.setBody(operationJson, ContentType.APPLICATION_JSON); @@ -144,10 +176,7 @@ class HttpFeedClient implements FeedClient { return requestStrategy.enqueue(documentId, request, this::send) .handle((response, thrown) -> { if (thrown != null) { - if (requestStrategy.hasFailed()) { - try { close(); } - catch (IOException exception) { thrown.addSuppressed(exception); } - } + // TODO: What to do with exceptions here? Ex on 400, 401, 403, etc, and wrap and throw? ByteArrayOutputStream buffer = new ByteArrayOutputStream(); thrown.printStackTrace(new PrintStream(buffer)); return new Result(Result.Type.failure, documentId, buffer.toString(), null); @@ -160,25 +189,28 @@ class HttpFeedClient implements FeedClient { private void send(SimpleHttpRequest request, CompletableFuture vessel) { int index = 0; int min = Integer.MAX_VALUE; - for (int i = 0; i < httpClients.size(); i++) - if (inflight.get(i).get() < min) { - min = inflight.get(i).get(); + for (int i = 0; i < endpoints.size(); i++) + if (endpoints.get(i).inflight.get() < min) { index = i; + min = endpoints.get(i).inflight.get(); } - inflight.get(index).incrementAndGet(); + Endpoint endpoint = endpoints.get(index); + endpoint.inflight.incrementAndGet(); try { - httpClients.get(index).execute(request, - new FutureCallback() { - @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } - @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } - @Override public void cancelled() { vessel.cancel(false); } - }); + request.setScheme(endpoint.url.getScheme()); + request.setAuthority(new URIAuthority(endpoint.url.getHost(), endpoint.url.getPort())); + endpoint.client.execute(request, + new FutureCallback() { + @Override public void completed(SimpleHttpResponse response) { vessel.complete(response); } + @Override public void failed(Exception ex) { vessel.completeExceptionally(ex); } + @Override public void cancelled() { vessel.cancel(false); } + }); } catch (Throwable thrown) { vessel.completeExceptionally(thrown); } - vessel.thenRun(inflight.get(index)::decrementAndGet); + vessel.whenComplete((__, ___) -> endpoint.inflight.decrementAndGet()); } static Result toResult(SimpleHttpResponse response, DocumentId documentId) { @@ -214,8 +246,8 @@ class HttpFeedClient implements FeedClient { return path; } - static URI operationUrl(URI endpoint, DocumentId documentId, OperationParameters params) { - URIBuilder url = new URIBuilder(endpoint); + static URI operationPath(DocumentId documentId, OperationParameters params) { + URIBuilder url = new URIBuilder(); url.setPathSegments(toPath(documentId)); if (params.createIfNonExistent()) url.addParameter("create", "true"); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index d0d67d65446..a3a29412254 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -1,26 +1,29 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.CircuitBreaker; import ai.vespa.feed.client.FeedClient.RetryStrategy; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.IOException; -import java.time.Clock; -import java.time.Instant; -import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.logging.Logger; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.logging.Level.FINE; -import static java.util.logging.Level.INFO; +// TODO: update doc /** * Controls request execution and retries: *
    @@ -31,57 +34,73 @@ import static java.util.logging.Level.INFO; * * @author jonmv */ -class HttpRequestStrategy implements RequestStrategy, AutoCloseable { +class HttpRequestStrategy implements RequestStrategy { private static final Logger log = Logger.getLogger(HttpRequestStrategy.class.getName()); - private final Map> inflightById = new HashMap<>(); - private final Object monitor = new Object(); - private final Clock clock; - private final RetryStrategy wrapped; - private final Thread delayer = new Thread(this::drainDelayed, "feed-client-retry-delayer"); - private final BlockingQueue> delayed = new LinkedBlockingQueue<>(); + private final Map> inflightById = new ConcurrentHashMap<>(); + private final RetryStrategy strategy; + private final CircuitBreaker breaker; + private final Queue queue = new ConcurrentLinkedQueue<>(); private final long maxInflight; private final long minInflight; - private double targetInflight; - private long inflight = 0; - private long consecutiveSuccesses = 0; - private Instant lastSuccess; - private boolean failed = false; - private boolean closed = false; - - HttpRequestStrategy(FeedClientBuilder builder, Clock clock) { - this.wrapped = builder.retryStrategy; - this.maxInflight = builder.maxConnections * (long) builder.maxStreamsPerConnection; - this.minInflight = builder.maxConnections * (long) min(16, builder.maxStreamsPerConnection); - this.targetInflight = Math.sqrt(maxInflight) * (Math.sqrt(minInflight)); - this.clock = clock; - this.lastSuccess = clock.instant(); - this.delayer.start(); - } - - private void drainDelayed() { - try { - while (true) { - do delayed.take().complete(null); - while ( ! hasFailed()); + private final AtomicLong targetInflightX10; // 10x target, so we can increment one every tenth success. + private final AtomicLong inflight = new AtomicLong(0); + private final AtomicBoolean destroyed = new AtomicBoolean(false); + private final AtomicLong delayedCount = new AtomicLong(0); + private final AtomicLong retries = new AtomicLong(0); + + HttpRequestStrategy(FeedClientBuilder builder) { + this.strategy = builder.retryStrategy; + this.breaker = builder.circuitBreaker; + this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; + this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection); + this.targetInflightX10 = new AtomicLong(10 * (long) (Math.sqrt(minInflight) * Math.sqrt(maxInflight))); + new Thread(this::dispatch, "feed-client-dispatcher").start(); + } - Thread.sleep(1000); + private void dispatch() { + try { + while ( ! destroyed.get()) { + CircuitBreaker.State state = breaker.state(); + if (state == OPEN) destroy(); + else while ( ! isInExcess()) + if ( ! poll() || breaker.state() == HALF_OPEN) break; + + // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); } } catch (InterruptedException e) { - delayed.forEach(action -> action.cancel(true)); + destroy(); } } + private void offer(Runnable task) { + delayedCount.incrementAndGet(); + queue.offer(task); + } + + private boolean poll() { + Runnable task = queue.poll(); + if (task == null) return false; + delayedCount.decrementAndGet(); + task.run(); + return true; + } + + private boolean isInExcess() { + return inflight.get() - delayedCount.get() > targetInflight(); + } + private boolean retry(SimpleHttpRequest request, int attempt) { - if (attempt >= wrapped.retries()) + if (attempt >= strategy.retries()) return false; switch (request.getMethod().toUpperCase()) { - case "POST": return wrapped.retry(FeedClient.OperationType.put); - case "PUT": return wrapped.retry(FeedClient.OperationType.update); - case "DELETE": return wrapped.retry(FeedClient.OperationType.remove); + case "POST": return strategy.retry(FeedClient.OperationType.PUT); + case "PUT": return strategy.retry(FeedClient.OperationType.UPDATE); + case "DELETE": return strategy.retry(FeedClient.OperationType.REMOVE); default: throw new IllegalStateException("Unexpected HTTP method: " + request.getMethod()); } } @@ -91,8 +110,8 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { * or the user has turned off retries for this type of operation. */ private boolean retry(SimpleHttpRequest request, Throwable thrown, int attempt) { - failure(); - log.log(INFO, thrown, () -> "Failed attempt " + attempt + " at " + request + ", " + consecutiveSuccesses + " successes since last error"); + breaker.failure(); + log.log(FINE, thrown, () -> "Failed attempt " + attempt + " at " + request); if ( ! (thrown instanceof IOException)) return false; @@ -100,74 +119,69 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { return retry(request, attempt); } - void success() { - Instant now = clock.instant(); - synchronized (monitor) { - ++consecutiveSuccesses; - lastSuccess = now; - targetInflight = min(targetInflight + 0.1, maxInflight); - } + private void incrementTargetInflight() { + targetInflightX10.incrementAndGet(); } - void failure() { - Instant threshold = clock.instant().minusSeconds(300); - synchronized (monitor) { - consecutiveSuccesses = 0; - if (lastSuccess.isBefore(threshold)) - failed = true; - } + private void decreaseTargetInflight() { + targetInflightX10.set(max((inflight.get() - delayedCount.get()) * 9, minInflight * 10)); + } + + private long targetInflight() { + return min(targetInflightX10.get() / 10, maxInflight); } /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ private boolean retry(SimpleHttpRequest request, SimpleHttpResponse response, int attempt) { if (response.getCode() / 100 == 2) { - success(); + breaker.success(); + incrementTargetInflight(); return false; } - if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. - synchronized (monitor) { - targetInflight = max(inflight * 0.9, minInflight); - } - log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt + - " at " + request + ", " + consecutiveSuccesses + " successes since last error"); + log.log(FINE, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + + ") on attempt " + attempt + " at " + request); + if (response.getCode() == 429 || response.getCode() == 503) { // Throttling; reduce target inflight. + decreaseTargetInflight(); return true; } - log.log(INFO, () -> "Status code " + response.getCode() + " (" + response.getBodyText() + ") on attempt " + attempt + - " at " + request + ", " + consecutiveSuccesses + " successes since last error"); - - failure(); - if (response.getCode() != 500 && response.getCode() != 502) - return false; + breaker.failure(); + if (response.getCode() == 500 || response.getCode() == 502 || response.getCode() == 504) // Hopefully temporary errors. + return retry(request, attempt); - return retry(request, attempt); // Hopefully temporary errors. + return false; } - // Must hold lock. private void acquireSlot() { try { - while (inflight >= targetInflight) - monitor.wait(); + while (inflight.get() >= targetInflight()) + Thread.sleep(1); - ++inflight; + inflight.incrementAndGet(); } catch (InterruptedException e) { throw new RuntimeException(e); } } - // Must hold lock. private void releaseSlot() { - for (long i = --inflight; i < targetInflight; i++) - monitor.notify(); + inflight.decrementAndGet(); } @Override public boolean hasFailed() { - synchronized (monitor) { - return failed; + return breaker.state() == OPEN; + } + + public void await() { + try { + while (inflight.get() > 0) + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } @@ -176,33 +190,24 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { BiConsumer> dispatch) { CompletableFuture result = new CompletableFuture<>(); // Carries the aggregate result of the operation, including retries. CompletableFuture vessel = new CompletableFuture<>(); // Holds the computation of a single dispatch to the HTTP client. - CompletableFuture blocker = new CompletableFuture<>(); // Blocks the next operation with same doc-id, then triggers it when complete. - - // Get the previous inflight operation for this doc-id, or acquire a send slot. - CompletableFuture previous; - synchronized (monitor) { - previous = inflightById.put(documentId, blocker); - if (previous == null) - acquireSlot(); + CompletableFuture previous = inflightById.put(documentId, result); + if (destroyed.get()) { + result.cancel(true); + return result; } - if (previous == null) // Send immediately if none inflight ... + + if (previous == null) { + acquireSlot(); dispatch.accept(request, vessel); - else // ... or send when the previous inflight is done. - previous.thenRun(() -> dispatch.accept(request, vessel)); + } + else + previous.whenComplete((__, ___) -> offer(() -> dispatch.accept(request, vessel))); handleAttempt(vessel, dispatch, request, result, 1); - result.thenRun(() -> { - CompletableFuture current; - synchronized (monitor) { - current = inflightById.get(documentId); - if (current == blocker) { // Release slot and clear map if no other operations enqueued for this doc-id ... - releaseSlot(); - inflightById.put(documentId, null); - } - } - if (current != blocker) // ... or trigger sending the next enqueued operation. - blocker.complete(null); + result.whenComplete((__, ___) -> { + if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) + releaseSlot(); }); return result; @@ -215,33 +220,24 @@ class HttpRequestStrategy implements RequestStrategy, AutoCloseable { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { + retries.incrementAndGet(); + CircuitBreaker.State state = breaker.state(); CompletableFuture retry = new CompletableFuture<>(); - boolean hasFailed = hasFailed(); - if (hasFailed) - delayed.add(new CompletableFuture<>().thenRun(() -> dispatch.accept(request, retry))); - else - dispatch.accept(request, retry); - handleAttempt(retry, dispatch, request, result, attempt + (hasFailed ? 0 : 1)); - return; + offer(() -> dispatch.accept(request, retry)); + handleAttempt(retry, dispatch, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); } - // ... or accept the outcome and mark the operation as complete. - if (thrown == null) result.complete(response); - else result.completeExceptionally(thrown); + else { + if (thrown == null) result.complete(response); + else result.completeExceptionally(thrown); + } }); } @Override - public void close() { - synchronized (monitor) { - if (closed) - return; - - closed = true; - } - delayer.interrupt(); - try { delayer.join(); } - catch (InterruptedException e) { Thread.currentThread().interrupt(); } + public void destroy() { + if ( ! destroyed.getAndSet(true)) + inflightById.values().forEach(result -> result.cancel(true)); } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java index 17162f19d3f..99d05a4bae8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java @@ -17,9 +17,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static ai.vespa.feed.client.FeedClient.OperationType.put; -import static ai.vespa.feed.client.FeedClient.OperationType.remove; -import static ai.vespa.feed.client.FeedClient.OperationType.update; +import static ai.vespa.feed.client.FeedClient.OperationType.PUT; +import static ai.vespa.feed.client.FeedClient.OperationType.REMOVE; +import static ai.vespa.feed.client.FeedClient.OperationType.UPDATE; import static com.fasterxml.jackson.core.JsonToken.START_OBJECT; import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE; import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING; @@ -187,9 +187,9 @@ public class JsonStreamFeeder implements Closeable { case FIELD_NAME: switch (parser.getText()) { case "id": - case "put": type = put; id = readId(); break; - case "update": type = update; id = readId(); break; - case "remove": type = remove; id = readId(); break; + case "put": type = PUT; id = readId(); break; + case "update": type = UPDATE; id = readId(); break; + case "remove": type = REMOVE; id = readId(); break; case "condition": parameters = parameters.testAndSetCondition(readString()); break; case "create": parameters = parameters.createIfNonExistent(readBoolean()); break; case "fields": { @@ -230,9 +230,9 @@ public class JsonStreamFeeder implements Closeable { } switch (type) { - case put: return client.put (id, payload, parameters); - case update: return client.update(id, payload, parameters); - case remove: return client.remove(id, parameters); + case PUT: return client.put (id, payload, parameters); + case UPDATE: return client.update(id, payload, parameters); + case REMOVE: return client.remove(id, parameters); default: throw new IllegalStateException("Unexpected operation type '" + type + "'"); } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index 1787d8d65c6..c3bb4573fd4 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -4,6 +4,7 @@ package ai.vespa.feed.client; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import java.io.Closeable; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; @@ -17,6 +18,12 @@ public interface RequestStrategy { /** Whether this has failed fatally, and we should cease sending further operations. */ boolean hasFailed(); + /** Forcibly terminates this, causing all inflight operations to complete immediately. */ + void destroy(); + + /** Wait for all inflight requests to complete. */ + void await(); + /** Enqueue the given operation, returning its future result. This may block if the client send queue is full. */ CompletableFuture enqueue(DocumentId documentId, SimpleHttpRequest request, BiConsumer> dispatch); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java index 8ef8ae57f5e..8db0b8f2d43 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java @@ -57,9 +57,8 @@ class JsonStreamFeederTest { } @Override - public void close() throws IOException { + public void close(boolean graceful) { } - } }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6. assertEquals(docs + 1, ids.size()); } -- cgit v1.2.3