diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 10:21:57 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 14:07:42 +0200 |
commit | c1df455e40f18e7a6e8814fb40d725bab07eb601 (patch) | |
tree | 072a70389e1eda28593ad7050b64076cf5966094 /vespa-feed-client/src | |
parent | 6382fff8391393bd6d885c4e43152d00c5990c1d (diff) |
Fix and test circuit breaker logic
Diffstat (limited to 'vespa-feed-client/src')
3 files changed, 82 insertions, 14 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java index da575a7cf6d..d0a221ed358 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClientBuilder.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.file.Path; -import java.time.Clock; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -35,7 +34,7 @@ public class FeedClientBuilder { int connectionsPerEndpoint = 4; int maxStreamsPerConnection = 128; FeedClient.RetryStrategy retryStrategy = defaultRetryStrategy; - FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Clock.systemUTC(), Duration.ofSeconds(1), Duration.ofMinutes(10)); + FeedClient.CircuitBreaker circuitBreaker = new GracePeriodCircuitBreaker(Duration.ofSeconds(1), Duration.ofMinutes(10)); Path certificate; Path privateKey; Path caCertificates; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java index 974d18418ec..2c5c2dccf19 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/GracePeriodCircuitBreaker.java @@ -1,10 +1,10 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; -import java.time.Clock; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; import java.util.logging.Logger; import static java.util.Objects.requireNonNull; @@ -13,19 +13,26 @@ import static java.util.logging.Level.WARNING; /** * Breaks the circuit when no successes have been recorded for a specified time. + * + * @author jonmv */ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { private static final Logger log = Logger.getLogger(GracePeriodCircuitBreaker.class.getName()); + private static final long NEVER = 1L << 60; - private final AtomicLong lastSuccessMillis = new AtomicLong(0); // Trigger if first response is a failure. + private final AtomicLong failingSinceMillis = new AtomicLong(NEVER); private final AtomicBoolean halfOpen = new AtomicBoolean(false); private final AtomicBoolean open = new AtomicBoolean(false); - private final Clock clock; + private final LongSupplier clock; private final long graceMillis; private final long doomMillis; - GracePeriodCircuitBreaker(Clock clock, Duration grace, Duration doom) { + public GracePeriodCircuitBreaker(Duration grace, Duration doom) { + this(System::currentTimeMillis, grace, doom); + } + + GracePeriodCircuitBreaker(LongSupplier clock, Duration grace, Duration doom) { if (grace.isNegative()) throw new IllegalArgumentException("Grace delay must be non-negative"); @@ -39,23 +46,25 @@ public class GracePeriodCircuitBreaker implements FeedClient.CircuitBreaker { @Override public void success() { - lastSuccessMillis.set(clock.millis()); - if (halfOpen.compareAndSet(true, false)) + failingSinceMillis.set(NEVER); + if ( ! open.get() && halfOpen.compareAndSet(true, false)) log.log(INFO, "Circuit breaker is now closed"); } @Override public void failure() { - long nowMillis = clock.millis(); - if (lastSuccessMillis.get() < nowMillis - doomMillis && open.compareAndSet(false, true)) - log.log(WARNING, "Circuit breaker is now open"); - - if (lastSuccessMillis.get() < nowMillis - graceMillis && halfOpen.compareAndSet(false, true)) - log.log(INFO, "Circuit breaker is now half-open"); + failingSinceMillis.compareAndSet(NEVER, clock.getAsLong()); } @Override public State state() { + long failingMillis = clock.getAsLong() - failingSinceMillis.get(); + if (failingMillis > graceMillis && halfOpen.compareAndSet(false, true)) + log.log(INFO, "Circuit breaker is now half-open"); + + if (failingMillis > doomMillis && open.compareAndSet(false, true)) + log.log(WARNING, "Circuit breaker is now open"); + return open.get() ? State.OPEN : halfOpen.get() ? State.HALF_OPEN : State.CLOSED; } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java new file mode 100644 index 00000000000..6b39d9053b4 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/GracePeriodCircuitBreakerTest.java @@ -0,0 +1,60 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import ai.vespa.feed.client.FeedClient.CircuitBreaker; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicLong; + +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; +import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class GracePeriodCircuitBreakerTest { + + @Test + void testCircuitBreaker() { + AtomicLong now = new AtomicLong(0); + long SECOND = 1000; + CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(1)); + + assertEquals(CLOSED, breaker.state(), "Initial state is closed"); + + now.addAndGet(100 * SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed after some time without activity"); + + breaker.success(); + assertEquals(CLOSED, breaker.state(), "State is closed after a success"); + + now.addAndGet(100 * SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed some time after a success"); + + breaker.failure(); + assertEquals(CLOSED, breaker.state(), "State is closed right after a failure"); + + now.addAndGet(SECOND); + assertEquals(CLOSED, breaker.state(), "State is closed until grace period has passed"); + + now.addAndGet(1); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open when grace period has passed"); + + breaker.success(); + assertEquals(CLOSED, breaker.state(), "State is closed after a new success"); + + breaker.failure(); + now.addAndGet(60 * SECOND); + assertEquals(HALF_OPEN, breaker.state(), "State is half-open until doom period has passedd"); + + now.addAndGet(1); + assertEquals(OPEN, breaker.state(), "State is open when doom period has passed"); + + breaker.success(); + assertEquals(OPEN, breaker.state(), "State remains open in spite of new successes"); + } + +} |