summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 10:21:57 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-09 14:07:42 +0200
commitc1df455e40f18e7a6e8814fb40d725bab07eb601 (patch)
tree072a70389e1eda28593ad7050b64076cf5966094 /vespa-feed-client
parent6382fff8391393bd6d885c4e43152d00c5990c1d (diff)
Fix and test circuit breaker logic
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java3
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java33
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java60
3 files changed, 82 insertions, 14 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
index da575a7cf6d..d0a221ed358 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java
@@ -7,7 +7,6 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Path;
-import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@@ -35,7 +34,7 @@ public class FeedClientBuilder {
int connectionsPerEndpoint = 4;
int maxStreamsPerConnection = 128;
FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy;
- FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Clock.systemUTC(), Duration.ofSeconds(1), Duration.ofMinutes(10));
+ FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10));
Path certificate;
Path privateKey;
Path caCertificates;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
index 974d18418ec..2c5c2dccf19 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java
@@ -1,10 +1,10 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
-import java.time.Clock;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
import java.util.logging.Logger;
import static java.util.Objects.requireNonNull;
@@ -13,19 +13,26 @@ import static java.util.logging.Level.WARNING;
/**
* Breaks the circuit when no successes have been recorded for a specified time.
+ *
+ * @author jonmv
*/
public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName());
+ private static final long NEVER = 1L << 60;
- private final AtomicLong lastSuccessMillis = new AtomicLong(0); // Trigger if first response is a failure.
+ private final AtomicLong failingSinceMillis = new AtomicLong(NEVER);
private final AtomicBoolean halfOpen = new AtomicBoolean(false);
private final AtomicBoolean open = new AtomicBoolean(false);
- private final Clock clock;
+ private final LongSupplier clock;
private final long graceMillis;
private final long doomMillis;
- GracePeriodCircuitBreaker(Clock clock, Duration grace, Duration doom) {
+ public GracePeriodCircuitBreaker(Duration grace, Duration doom) {
+ this(System::currentTimeMillis, grace, doom);
+ }
+
+ GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) {
if (grace.isNegative())
throw new IllegalArgumentException("Grace delay must be non-negative");
@@ -39,23 +46,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker {
@Override
public void success() {
- lastSuccessMillis.set(clock.millis());
- if (halfOpen.compareAndSet(true, false))
+ failingSinceMillis.set(NEVER);
+ if ( ! open.get() && halfOpen.compareAndSet(true, false))
log.log(INFO, "Circuit breaker is now closed");
}
@Override
public void failure() {
- long nowMillis = clock.millis();
- if (lastSuccessMillis.get() < nowMillis - doomMillis && open.compareAndSet(false, true))
- log.log(WARNING, "Circuit breaker is now open");
-
- if (lastSuccessMillis.get() < nowMillis - graceMillis && halfOpen.compareAndSet(false, true))
- log.log(INFO, "Circuit breaker is now half-open");
+ failingSinceMillis.compareAndSet(NEVER, clock.getAsLong());
}
@Override
public State state() {
+ long failingMillis = clock.getAsLong() - failingSinceMillis.get();
+ if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true))
+ log.log(INFO, "Circuit breaker is now half-open");
+
+ if (failingMillis > doomMillis && open.compareAndSet(false, true))
+ log.log(WARNING, "Circuit breaker is now open");
+
return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED;
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
new file mode 100644
index 00000000000..6b39d9053b4
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java
@@ -0,0 +1,60 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import ai.vespa.feed.client.FeedClient.CircuitBreaker;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
+import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class GracePeriodCircuitBreakerTest {
+
+ @Test
+ void testCircuitBreaker() {
+ AtomicLong now = new AtomicLong(0);
+ long SECOND = 1000;
+ CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1));
+
+ assertEquals(CLOSED, breaker.state(), "Initial state is closed");
+
+ now.addAndGet(100 * SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity");
+
+ breaker.success();
+ assertEquals(CLOSED, breaker.state(), "State is closed after a success");
+
+ now.addAndGet(100 * SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed some time after a success");
+
+ breaker.failure();
+ assertEquals(CLOSED, breaker.state(), "State is closed right after a failure");
+
+ now.addAndGet(SECOND);
+ assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed");
+
+ now.addAndGet(1);
+ assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed");
+
+ breaker.success();
+ assertEquals(CLOSED, breaker.state(), "State is closed after a new success");
+
+ breaker.failure();
+ now.addAndGet(60 * SECOND);
+ assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passedd");
+
+ now.addAndGet(1);
+ assertEquals(OPEN, breaker.state(), "State is open when doom period has passed");
+
+ breaker.success();
+ assertEquals(OPEN, breaker.state(), "State remains open in spite of new successes");
+ }
+
+}