summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java41
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java18
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java40
6 files changed, 65 insertions, 58 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
index 1ea2089c0eb..cec7106403e 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreaker.java
@@ -12,7 +12,6 @@ import java.util.function.LongSupplier;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.FINE;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
@@ -24,22 +23,22 @@ import static java.util.logging.Level.WARNING;
public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
- private static final long NEVER = 1L << 60;
- private final AtomicLong failingSinceMillis = new AtomicLong(NEVER);
private final AtomicBoolean halfOpen = new AtomicBoolean(false);
private final AtomicBoolean open = new AtomicBoolean(false);
- private final LongSupplier clock;
+ private final LongSupplier nanoClock;
+ private final long never;
+ private final AtomicLong failingSinceNanos;
private final AtomicReference<String> detail = new AtomicReference<>();
- private final long graceMillis;
- private final long doomMillis;
+ private final long graceNanos;
+ private final long doomNanos;
/**
* Creates a new circuit breaker with the given grace periods.
* @param grace the period of consecutive failures before state changes to half-open.
*/
public GracePeriodCircuitBreaker(Duration grace) {
- this(System::currentTimeMillis, grace, null);
+ this(System::nanoTime, grace, null);
}
/**
@@ -48,23 +47,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
* @param doom the period of consecutive failures before shutting down.
*/
public GracePeriodCircuitBreaker(Duration grace, Duration doom) {
- this(System::currentTimeMillis, grace, doom);
+ this(System::nanoTime, grace, doom);
if (doom.isNegative())
throw new IllegalArgumentException("Doom delay must be non-negative");
}
- GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) {
+ GracePeriodCircuitBreaker(LongSupplier nanoClock, Duration grace, Duration doom) {
if (grace.isNegative())
throw new IllegalArgumentException("Grace delay must be non-negative");
- this.clock = requireNonNull(clock);
- this.graceMillis = grace.toMillis();
- this.doomMillis = doom == null ? -1 : doom.toMillis();
+ this.nanoClock = requireNonNull(nanoClock);
+ this.never = nanoClock.getAsLong() + (1L << 60);
+ this.graceNanos = grace.toNanos();
+ this.doomNanos = doom == null ? -1 : doom.toNanos();
+ this.failingSinceNanos = new AtomicLong(never);
}
@Override
public void success() {
- failingSinceMillis.set(NEVER);
+ failingSinceNanos.set(never);
if ( ! open.get() && halfOpen.compareAndSet(true, false))
log.log(INFO, "Circuit breaker is now closed, after a request was successful");
}
@@ -80,21 +81,21 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
}
private void failure(String detail) {
- if (failingSinceMillis.compareAndSet(NEVER, clock.getAsLong()))
+ if (failingSinceNanos.compareAndSet(never, nanoClock.getAsLong()))
this.detail.set(detail);
}
@Override
public State state() {
- long failingMillis = clock.getAsLong() - failingSinceMillis.get();
- if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
+ long failingNanos = nanoClock.getAsLong() - failingSinceNanos.get();
+ if (failingNanos > graceNanos && halfOpen.compareAndSet(false, true))
log.log(INFO, "Circuit breaker is now half-open, as no requests have succeeded for the " +
- "last " + failingMillis + "ms. The server will be pinged to see if it recovers" +
- (doomMillis >= 0 ? ", but this client will give up if no successes are observed within " + doomMillis + "ms" : "") +
+ "last " + failingNanos / 1_000_000 + "ms. The server will be pinged to see if it recovers" +
+ (doomNanos >= 0 ? ", but this client will give up if no successes are observed within " + doomNanos / 1_000_000 + "ms" : "") +
". First failure was '" + detail.get() + "'.");
- if (doomMillis >= 0 && failingMillis > doomMillis && open.compareAndSet(false, true))
- log.log(WARNING, "Circuit breaker is now open, after " + doomMillis + "ms of failing request, " +
+ if (doomNanos >= 0 && failingNanos > doomNanos && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open, after " + doomNanos / 1_000_000 + "ms of failing request, " +
"and this client will give up and abort its remaining feed operations.");
return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
index 517fa7e4924..d12d72f7a70 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpFeedClient.java
@@ -115,7 +115,7 @@ class HttpFeedClient implements FeedClient {
requestHeaders,
operationJson == null ? null : operationJson.getBytes(UTF_8), // TODO: make it bytes all the way?
params.timeout().orElse(maxTimeout),
- System::currentTimeMillis);
+ System::nanoTime);
CompletableFuture<Result> promise = new CompletableFuture<>();
requestStrategy.enqueue(documentId, request)
@@ -141,7 +141,7 @@ class HttpFeedClient implements FeedClient {
requestHeaders,
null,
Duration.ofSeconds(15),
- System::currentTimeMillis);
+ System::nanoTime);
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
cluster.dispatch(request, future);
HttpResponse response = future.get(20, TimeUnit.SECONDS);
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
index a75c56c9368..fdd35b74c35 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequest.java
@@ -12,16 +12,18 @@ class HttpRequest {
private final String path;
private final Map<String, Supplier<String>> headers;
private final byte[] body;
- private final long deadlineMillis;
- private final LongSupplier clock;
+ private final Duration timeout;
+ private final long deadlineNanos;
+ private final LongSupplier nanoClock;
- public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier clock) {
+ public HttpRequest(String method, String path, Map<String, Supplier<String>> headers, byte[] body, Duration timeout, LongSupplier nanoClock) {
this.method = method;
this.path = path;
this.headers = headers;
this.body = body;
- this.deadlineMillis = clock.getAsLong() + timeout.toMillis();
- this.clock = clock;
+ this.deadlineNanos = nanoClock.getAsLong() + timeout.toNanos();
+ this.timeout = timeout;
+ this.nanoClock = nanoClock;
}
public String method() {
@@ -41,7 +43,11 @@ class HttpRequest {
}
public Duration timeLeft() {
- return Duration.ofMillis(deadlineMillis - clock.getAsLong());
+ return Duration.ofNanos(deadlineNanos - nanoClock.getAsLong());
+ }
+
+ public Duration timeout() {
+ return timeout;
}
@Override
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
index e856a86ffec..18369f29f0b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/JettyCluster.java
@@ -84,7 +84,7 @@ class JettyCluster implements Cluster {
endpoint.inflight.incrementAndGet();
long reqTimeoutMillis = req.timeLeft().toMillis();
if (reqTimeoutMillis <= 0) {
- vessel.completeExceptionally(new TimeoutException("Operation timed out"));
+ vessel.completeExceptionally(new TimeoutException("operation timed out after '" + req.timeout() + "'"));
return;
}
Request jettyReq = client.newRequest(URI.create(endpoint.uri + req.path()))
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
index f5ca70fe291..52b8dcc5884 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/GracePeriodCircuitBreakerTest.java
@@ -19,39 +19,39 @@ class GracePeriodCircuitBreakerTest {
@Test
void testCircuitBreaker() {
- AtomicLong now = new AtomicLong(0);
- long SECOND = 1000;
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
+ AtomicLong nowNanos = new AtomicLong(0);
+ long SECOND = 1_000_000_000L;
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
Throwable error = new Error();
assertEquals(CLOSED, breaker.state(), "Initial state is closed");
- now.addAndGet(100 * SECOND);
+ nowNanos.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a success");
- now.addAndGet(100 * SECOND);
+ nowNanos.addAndGet(100 * SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed some time after a success");
breaker.failure(error);
assertEquals(CLOSED, breaker.state(), "State is closed right after a failure");
- now.addAndGet(SECOND);
+ nowNanos.addAndGet(SECOND);
assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed");
- now.addAndGet(1);
+ nowNanos.addAndGet(1);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed");
breaker.success();
assertEquals(CLOSED, breaker.state(), "State is closed after a new success");
breaker.failure(error);
- now.addAndGet(60 * SECOND);
+ nowNanos.addAndGet(60 * SECOND);
assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passed");
- now.addAndGet(1);
+ nowNanos.addAndGet(1);
assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");
breaker.success();
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 51c6ee550e5..f313f08426c 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -87,8 +87,8 @@ class HttpRequestStrategyTest {
void testRetries() throws ExecutionException, InterruptedException, IOException {
int minStreams = 2; // Hard limit for minimum number of streams per connection.
MockCluster cluster = new MockCluster();
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong nowNanos = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; }
@@ -102,7 +102,7 @@ class HttpRequestStrategyTest {
DocumentId id1 = DocumentId.of("ns", "type", "1");
DocumentId id2 = DocumentId.of("ns", "type", "2");
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), now::get);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(180), nowNanos::get);
// Runtime exception is not retried.
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
@@ -114,7 +114,7 @@ class HttpRequestStrategyTest {
// IOException is retried.
cluster.expect((__, vessel) -> {
- now.addAndGet(200); // Exceed grace period.
+ nowNanos.addAndGet(200_000_000L); // Exceed grace period.
vessel.completeExceptionally(new IOException("retry me"));
});
expected = assertThrows(ExecutionException.class,
@@ -129,7 +129,7 @@ class HttpRequestStrategyTest {
assertEquals(4, strategy.stats().requests());
// Throttled requests are retried. Concurrent operations to same ID (only) are serialised.
- now.set(2000);
+ nowNanos.set(2_000_000_000L);
HttpResponse throttled = HttpResponse.of(429, null);
AtomicInteger count = new AtomicInteger(3);
CountDownLatch latch = new CountDownLatch(1);
@@ -146,11 +146,11 @@ class HttpRequestStrategyTest {
else vessel.complete(success);
});
CompletableFuture<HttpResponse> delayed = strategy.enqueue(id1, request);
- CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get));
- assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), now::get)).get());
+ CompletableFuture<HttpResponse> serialised = strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get));
+ assertEquals(success, strategy.enqueue(id2, new HttpRequest("DELETE", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
latch.await();
assertEquals(8, strategy.stats().requests()); // 3 attempts at throttled and one at id2.
- now.set(4000);
+ nowNanos.set(4_000_000_000L);
assertEquals(CLOSED, breaker.state()); // Circuit not broken due to throttled requests.
completion.get().complete(success);
assertEquals(success, delayed.get());
@@ -159,7 +159,7 @@ class HttpRequestStrategyTest {
// Some error responses are retried.
HttpResponse serverError = HttpResponse.of(503, null);
cluster.expect((__, vessel) -> {
- now.addAndGet(200); // Exceed grace period.
+ nowNanos.addAndGet(200_000_000L); // Exceed grace period.
vessel.complete(serverError);
});
assertEquals(serverError, strategy.enqueue(id1, request).get());
@@ -168,7 +168,7 @@ class HttpRequestStrategyTest {
// Error responses are not retried when not of appropriate type.
cluster.expect((__, vessel) -> vessel.complete(serverError));
- assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), now::get)).get());
+ assertEquals(serverError, strategy.enqueue(id1, new HttpRequest("PUT", "/", null, null, Duration.ofSeconds(1), nowNanos::get)).get());
assertEquals(12, strategy.stats().requests());
// Some error responses are not retried.
@@ -180,19 +180,19 @@ class HttpRequestStrategyTest {
// IOException is not retried past timeout.
cluster.expect((__, vessel) -> {
- now.addAndGet(50); // Exceed grace period after 2 attempts.
+ nowNanos.addAndGet(50_000_000L); // Exceed grace period after 2 attempts.
vessel.completeExceptionally(new IOException("retry me"));
});
expected = assertThrows(ExecutionException.class,
- () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), now::get)).get());
+ () -> strategy.enqueue(id1, new HttpRequest("POST", "/", null, null, Duration.ofMillis(100), nowNanos::get)).get());
assertEquals("retry me", expected.getCause().getCause().getMessage());
assertEquals(15, strategy.stats().requests());
// Circuit breaker opens some time after starting to fail.
- now.set(6000);
+ nowNanos.set(6_000_000_000L);
assertEquals(HALF_OPEN, breaker.state()); // Circuit broken due to failed requests.
- now.set(605000);
+ nowNanos.set(605_000_000_000L);
assertEquals(OPEN, breaker.state()); // Circuit broken due to failed requests.
strategy.destroy();
@@ -261,8 +261,8 @@ class HttpRequestStrategyTest {
@Test
void testShutdown() throws IOException {
MockCluster cluster = new MockCluster();
- AtomicLong now = new AtomicLong(0);
- CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
+ AtomicLong nowNanos = new AtomicLong(0);
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(nowNanos::get, Duration.ofSeconds(1), Duration.ofMinutes(10));
HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123")))
.setRetryStrategy(new FeedClient.RetryStrategy() {
@Override public int retries() { return 1; }
@@ -276,10 +276,10 @@ class HttpRequestStrategyTest {
DocumentId id3 = DocumentId.of("ns", "type", "3");
DocumentId id4 = DocumentId.of("ns", "type", "4");
DocumentId id5 = DocumentId.of("ns", "type", "5");
- HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
- HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
- HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
- HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), now::get);
+ HttpRequest failing = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest partial = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest request = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
+ HttpRequest blocking = new HttpRequest("POST", "/", null, null, Duration.ofSeconds(1), nowNanos::get);
// Enqueue some operations to the same id, which are serialised, and then shut down while operations are in flight.
Phaser phaser = new Phaser(2);