diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 12:02:43 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 12:02:43 +0200 |
commit | 24bfeefd369bca807776105746959896d46300ce (patch) | |
tree | faadd7521ad8e23b800b99ff93121e9c1b4eb370 /vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java | |
parent | 566162d1790d4ba851fc5e06bab07bf1d5699084 (diff) |
Strict FIFO when throttling
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java index 53beae2832f..d8a94451cd5 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java @@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.logging.Logger; @@ -104,7 +104,10 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor this.clock = requireNonNull(clock); this.clusters = Map.copyOf(clusters); this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock); - this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock); + this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> { + context.error(TIMEOUT, "Timed out after " + defaultTimeout); + return true; + }, defaultTimeout, clock); } private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { @@ -230,24 +233,25 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ private void accept(Supplier<Result> operation, OperationContext context) { timeouts.add(operation, context); - send(operation, context); + if (throttled.size() > 0 || ! send(operation, context)) + if ( ! throttled.add(operation, context)) + context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); } - /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */ - private void send(Supplier<Result> operation, OperationContext context) { + /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */ + private boolean send(Supplier<Result> operation, OperationContext context) { Result result = operation.get(); switch (result.type()) { case SUCCESS: outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge); - break; + return true; case TRANSIENT_ERROR: - if ( ! throttled.add(operation, context)) - context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); - break; + return false; default: log.log(WARNING, "Unknown result type '" + result.type() + "'"); case FATAL_ERROR: // intentional fallthrough context.error(ERROR, result.getError().getMessage()); + return true; // Request handled, don't retry. } } @@ -322,7 +326,7 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor private final Duration delay; private final long defaultWaitMillis; - public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) { + public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) { if (maxSize < 0) throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize); if (delay.isNegative()) @@ -375,11 +379,12 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first. */ - private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) { + private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) { while ( ! Thread.currentThread().isInterrupted()) { try { Instant waitUntil = null; Iterator<Delayed> operations = queue.iterator(); + boolean rejected = false; while (operations.hasNext()) { Delayed delayed = operations.next(); // Already handled: remove and continue. @@ -388,12 +393,17 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor size.decrementAndGet(); continue; } - // Ready for action: remove from queue and run. - if (delayed.readyAt().isBefore(clock.instant())) { - operations.remove(); - size.decrementAndGet(); - action.accept(delayed.operation(), delayed.context()); - continue; + // Ready for action: remove from queue and run unless an operation was already rejected. + if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) { + if (action.test(delayed.operation(), delayed.context())) { + operations.remove(); + size.decrementAndGet(); + continue; + } + else { // If an operation is rejected, handle no more this run, and wait a short while before retrying. + waitUntil = clock.instant().plus(Duration.ofMillis(10)); + rejected = true; + } } // Not yet ready for action: keep time to wake up again. waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); |