diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-14 08:21:04 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-14 09:33:09 +0200 |
commit | d60e4a6842add6cc04ac1782f54a909a07f33523 (patch) | |
tree | ed6fb81fa79c7d97fecb64e69dc25284a1684e42 /vespa-feed-client | |
parent | fa0844e8bf086c36326f6717bfe44d0b855f6a79 (diff) |
Expose circuit breaker state, to let user close client from outside
Diffstat (limited to 'vespa-feed-client')
6 files changed, 21 insertions, 19 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index d4cd53daecc..39d343515fe 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -22,6 +22,9 @@ public interface FeedClient extends Closeable { /** Returns a snapshot of the stats for this feed client, such as requests made, and responses by status. */ OperationStats stats(); + /** Current state of the circuit breaker. */ + default CircuitBreaker.State circuitBreakerState() { return CircuitBreaker.State.CLOSED; } + /** Shut down, and reject new operations. Operations in flight are allowed to complete normally if graceful. */ void close(boolean graceful); diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java index 256d3ae535c..b160cced4b9 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpFeedClient.java @@ -63,6 +63,11 @@ class HttpFeedClient implements FeedClient { } @Override + public CircuitBreaker.State circuitBreakerState() { + return requestStrategy.circuitBreakerState(); + } + + @Override public void close(boolean graceful) { closed.set(true); if (graceful) @@ -71,17 +76,7 @@ class HttpFeedClient implements FeedClient { requestStrategy.destroy(); } - private void ensureOpen() { - if (requestStrategy.hasFailed()) - close(); - - if (closed.get()) - throw new IllegalStateException("Client is closed, no further operations may be sent"); - } - private CompletableFuture<Result> send(String method, DocumentId documentId, String operationJson, OperationParameters params) { - ensureOpen(); - HttpRequest request = new HttpRequest(method, getPath(documentId) + getQuery(params), requestHeaders, diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 5646d37cde3..3cce423735f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -75,16 +75,22 @@ class HttpRequestStrategy implements RequestStrategy { dispatcher.start(); } + @Override public OperationStats stats() { return cluster.stats(); } + @Override + public CircuitBreaker.State circuitBreakerState() { + return breaker.state(); + } + private void dispatch() { try { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); + Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); // TODO: Reduce throughput when turning half-open? } } catch (InterruptedException e) { @@ -188,11 +194,6 @@ class HttpRequestStrategy implements RequestStrategy { inflight.decrementAndGet(); } - @Override - public boolean hasFailed() { - return breaker.state() == OPEN; - } - public void await() { try { while (inflight.get() > 0) diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java index dc889d29d36..eb52c3f9e27 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JettyCluster.java @@ -120,4 +120,5 @@ class JettyCluster implements Cluster { } } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java index 7764dce712b..a1101eb0ebb 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/RequestStrategy.java @@ -1,6 +1,8 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package ai.vespa.feed.client; +import ai.vespa.feed.client.FeedClient.CircuitBreaker.State; + import java.util.concurrent.CompletableFuture; /** @@ -13,8 +15,8 @@ interface RequestStrategy { /** Stats for operations sent through this. */ OperationStats stats(); - /** Whether this has failed fatally, and we should cease sending further operations. */ - boolean hasFailed(); + /** State of the circuit breaker. */ + State circuitBreakerState(); /** Forcibly terminates this, causing all inflight operations to complete immediately. */ void destroy(); diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java index 0605ed5bf65..d8090549420 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpFeedClientTest.java @@ -25,7 +25,7 @@ class HttpFeedClientTest { AtomicReference<BiFunction<DocumentId, HttpRequest, CompletableFuture<HttpResponse>>> dispatch = new AtomicReference<>(); class MockRequestStrategy implements RequestStrategy { @Override public OperationStats stats() { throw new UnsupportedOperationException(); } - @Override public boolean hasFailed() { return false; } + @Override public FeedClient.CircuitBreaker.State circuitBreakerState() { return FeedClient.CircuitBreaker.State.CLOSED; } @Override public void destroy() { throw new UnsupportedOperationException(); } @Override public void await() { throw new UnsupportedOperationException(); } @Override public CompletableFuture<HttpResponse> enqueue(DocumentId documentId, HttpRequest request) { return dispatch.get().apply(documentId, request); } |