diff options
Diffstat (limited to 'vespa-feed-client')
6 files changed, 65 insertions, 58 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java index 1ea2089c0eb..cec7106403e 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java @@ -12,7 +12,6 @@ import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.FINE; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; @@ -24,22 +23,22 @@ import static java.util.logging.Level.WARNING; public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); - private static final long NEVER = 1L << 60; - private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); private final AtomicBoolean halfOpen = new AtomicBoolean(false); private final AtomicBoolean open = new AtomicBoolean(false); - private final LongSupplier clock; + private final LongSupplier nanoClock; + private final long never; + private final AtomicLong failingSinceNanos; private final AtomicReference<String> detail = new AtomicReference<>(); - private final long graceMillis; - private final long doomMillis; + private final long graceNanos; + private final long doomNanos; /** * Creates a new circuit breaker with the given grace periods. * @param grace the period of consecutive failures before state changes to half-open. */ public GracePeriodCircuitBreaker(Duration grace) { - this(System::currentTimeMillis, grace, null); + this(System::nanoTime, grace, null); } /** @@ -48,23 +47,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { * @param doom the period of consecutive failures before shutting down. */ public GracePeriodCircuitBreaker(Duration grace, Duration doom) { - this(System::currentTimeMillis, grace, doom); + this(System::nanoTime, grace, doom); if (doom.isNegative()) throw new IllegalArgumentException("Doom delay must be non-negative"); } - GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { + GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) { if (grace.isNegative()) throw new IllegalArgumentException("Grace delay must be non-negative"); - this.clock = requireNonNull(clock); - this.graceMillis = grace.toMillis(); - this.doomMillis = doom == null ? -1 : doom.toMillis(); + this.nanoClock = requireNonNull(nanoClock); + this.never = nanoClock.getAsLong() + (1L << 60); + this.graceNanos = grace.toNanos(); + this.doomNanos = doom == null ? -1 : doom.toNanos(); + this.failingSinceNanos = new AtomicLong(never); } @Override public void success() { - failingSinceMillis.set(NEVER); + failingSinceNanos.set(never); if ( ! open.get() && halfOpen.compareAndSet(true, false)) log.log(INFO, "Circuit breaker is now closed, after a request was successful"); } @@ -80,21 +81,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { } private void failure(String detail) { - if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong())) + if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong())) this.detail.set(detail); } @Override public State state() { - long failingMillis = clock.getAsLong() - failingSinceMillis.get(); - if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) + long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get(); + if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true)) log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " + - "last " + failingMillis + "ms. The server will be pinged to see if it recovers" + - (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") + + "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" + + (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") + ". First failure was '" + detail.get() + "'."); - if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true)) - log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " + + if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " + "and this client will give up and abort its remaining feed operations."); return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java index 517fa7e4924..d12d72f7a70 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java @@ -115,7 +115,7 @@ class HttpFeedClient implements FeedClient { requestHeaders, operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way? params.timeout().orElse(maxTimeout), - System::currentTimeMillis); + System::nanoTime); CompletableFuture<Result> promise = new CompletableFuture<>(); requestStrategy.enqueue(documentId, request) @@ -141,7 +141,7 @@ class HttpFeedClient implements FeedClient { requestHeaders, null, Duration.ofSeconds(15), - System::currentTimeMillis); + System::nanoTime); CompletableFuture<HttpResponse> future = new CompletableFuture<>(); cluster.dispatch(request, future); HttpResponse response = future.get(20, TimeUnit.SECONDS); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java index a75c56c9368..fdd35b74c35 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java @@ -12,16 +12,18 @@ class HttpRequest { private final String path; private final Map<String, Supplier<String>> headers; private final byte[] body; - private final long deadlineMillis; - private final LongSupplier clock; + private final Duration timeout; + private final long deadlineNanos; + private final LongSupplier nanoClock; - public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier clock) { + public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) { this.method = method; this.path = path; this.headers = headers; this.body = body; - this.deadlineMillis = clock.getAsLong() + timeout.toMillis(); - this.clock = clock; + this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos(); + this.timeout = timeout; + this.nanoClock = nanoClock; } public String method() { @@ -41,7 +43,11 @@ class HttpRequest { } public Duration timeLeft() { - return Duration.ofMillis(deadlineMillis - clock.getAsLong()); + return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong()); + } + + public Duration timeout() { + return timeout; } @Override diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java index e856a86ffec..18369f29f0b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java @@ -84,7 +84,7 @@ class JettyCluster implements Cluster { endpoint.inflight.incrementAndGet(); long reqTimeoutMillis = req.timeLeft().toMillis(); if (reqTimeoutMillis <= 0) { - vessel.completeExceptionally(new TimeoutException("Operation timed out")); + vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'")); return; } Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path())) diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java index f5ca70fe291..52b8dcc5884 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java @@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest { @Test void testCircuitBreaker() { - AtomicLong now = new AtomicLong(0); - long SECOND = 1000; - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + AtomicLong nowNanos = new AtomicLong(0); + long SECOND = 1_000_000_000L; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); Throwable error = new Error(); assertEquals(CLOSED, breaker.state(), "Initial state is closed"); - now.addAndGet(100 * SECOND); + nowNanos.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a success"); - now.addAndGet(100 * SECOND); + nowNanos.addAndGet(100 * SECOND); assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); breaker.failure(error); assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); - now.addAndGet(SECOND); + nowNanos.addAndGet(SECOND); assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); - now.addAndGet(1); + nowNanos.addAndGet(1); assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); breaker.success(); assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); breaker.failure(error); - now.addAndGet(60 * SECOND); + nowNanos.addAndGet(60 * SECOND); assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed"); - now.addAndGet(1); + nowNanos.addAndGet(1); assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); breaker.success(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 51c6ee550e5..f313f08426c 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -87,8 +87,8 @@ class HttpRequestStrategyTest { void testRetries() throws ExecutionException, InterruptedException, IOException { int minStreams = 2; // Hard limit for minimum number of streams per connection. MockCluster cluster = new MockCluster(); - AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong nowNanos = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } @@ -102,7 +102,7 @@ class HttpRequestStrategyTest { DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), now::get); + HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get); // Runtime exception is not retried. cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); @@ -114,7 +114,7 @@ class HttpRequestStrategyTest { // IOException is retried. cluster.expect((__, vessel) -> { - now.addAndGet(200); // Exceed grace period. + nowNanos.addAndGet(200_000_000L); // Exceed grace period. vessel.completeExceptionally(new IOException("retry me")); }); expected = assertThrows(ExecutionException.class, @@ -129,7 +129,7 @@ class HttpRequestStrategyTest { assertEquals(4, strategy.stats().requests()); // Throttled requests are retried. Concurrent operations to same ID (only) are serialised. - now.set(2000); + nowNanos.set(2_000_000_000L); HttpResponse throttled = HttpResponse.of(429, null); AtomicInteger count = new AtomicInteger(3); CountDownLatch latch = new CountDownLatch(1); @@ -146,11 +146,11 @@ class HttpRequestStrategyTest { else vessel.complete(success); }); CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request); - CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)); - assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), now::get)).get()); + CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)); + assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); latch.await(); assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2. - now.set(4000); + nowNanos.set(4_000_000_000L); assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests. completion.get().complete(success); assertEquals(success, delayed.get()); @@ -159,7 +159,7 @@ class HttpRequestStrategyTest { // Some error responses are retried. HttpResponse serverError = HttpResponse.of(503, null); cluster.expect((__, vessel) -> { - now.addAndGet(200); // Exceed grace period. + nowNanos.addAndGet(200_000_000L); // Exceed grace period. vessel.complete(serverError); }); assertEquals(serverError, strategy.enqueue(id1, request).get()); @@ -168,7 +168,7 @@ class HttpRequestStrategyTest { // Error responses are not retried when not of appropriate type. cluster.expect((__, vessel) -> vessel.complete(serverError)); - assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)).get()); + assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get()); assertEquals(12, strategy.stats().requests()); // Some error responses are not retried. @@ -180,19 +180,19 @@ class HttpRequestStrategyTest { // IOException is not retried past timeout. cluster.expect((__, vessel) -> { - now.addAndGet(50); // Exceed grace period after 2 attempts. + nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts. vessel.completeExceptionally(new IOException("retry me")); }); expected = assertThrows(ExecutionException.class, - () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), now::get)).get()); + () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get()); assertEquals("retry me", expected.getCause().getCause().getMessage()); assertEquals(15, strategy.stats().requests()); // Circuit breaker opens some time after starting to fail. - now.set(6000); + nowNanos.set(6_000_000_000L); assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests. - now.set(605000); + nowNanos.set(605_000_000_000L); assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests. strategy.destroy(); @@ -261,8 +261,8 @@ class HttpRequestStrategyTest { @Test void testShutdown() throws IOException { MockCluster cluster = new MockCluster(); - AtomicLong now = new AtomicLong(0); - CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); + AtomicLong nowNanos = new AtomicLong(0); + CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } @@ -276,10 +276,10 @@ class HttpRequestStrategyTest { DocumentId id3 = DocumentId.of("ns", "type", "3"); DocumentId id4 = DocumentId.of("ns", "type", "4"); DocumentId id5 = DocumentId.of("ns", "type", "5"); - HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); - HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); - HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); - HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get); + HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); + HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get); // Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight. Phaser phaser = new Phaser(2); |