summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-30 12:02:43 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 12:02:43 +0200
commit24bfeefd369bca807776105746959896d46300ce (patch)
treefaadd7521ad8e23b800b99ff93121e9c1b4eb370 /vespaclient-container-plugin
parent566162d1790d4ba851fc5e06bab07bf1d5699084 (diff)
Strict FIFO when throttling
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java44
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java34
2 files changed, 55 insertions, 23 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
index 53beae2832f..d8a94451cd5 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
@@ -45,7 +45,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Logger;
@@ -104,7 +104,10 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
this.clock = requireNonNull(clock);
this.clusters = Map.copyOf(clusters);
this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
- this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
+ this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> {
+ context.error(TIMEOUT, "Timed out after " + defaultTimeout);
+ return true;
+ }, defaultTimeout, clock);
}
private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
@@ -230,24 +233,25 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
/** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
private void accept(Supplier<Result> operation, OperationContext context) {
timeouts.add(operation, context);
- send(operation, context);
+ if (throttled.size() > 0 || ! send(operation, context))
+ if ( ! throttled.add(operation, context))
+ context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
}
- /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
- private void send(Supplier<Result> operation, OperationContext context) {
+ /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */
+ private boolean send(Supplier<Result> operation, OperationContext context) {
Result result = operation.get();
switch (result.type()) {
case SUCCESS:
outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
- break;
+ return true;
case TRANSIENT_ERROR:
- if ( ! throttled.add(operation, context))
- context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
- break;
+ return false;
default:
log.log(WARNING, "Unknown result type '" + result.type() + "'");
case FATAL_ERROR: // intentional fallthrough
context.error(ERROR, result.getError().getMessage());
+ return true; // Request handled, don't retry.
}
}
@@ -322,7 +326,7 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
private final Duration delay;
private final long defaultWaitMillis;
- public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
+ public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
if (maxSize < 0)
throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
if (delay.isNegative())
@@ -375,11 +379,12 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
* If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
* lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
*/
- private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
+ private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) {
while ( ! Thread.currentThread().isInterrupted()) {
try {
Instant waitUntil = null;
Iterator<Delayed> operations = queue.iterator();
+ boolean rejected = false;
while (operations.hasNext()) {
Delayed delayed = operations.next();
// Already handled: remove and continue.
@@ -388,12 +393,17 @@ public class DocumentOperationExecutorImpl implements DocumentOperationExecutor
size.decrementAndGet();
continue;
}
- // Ready for action: remove from queue and run.
- if (delayed.readyAt().isBefore(clock.instant())) {
- operations.remove();
- size.decrementAndGet();
- action.accept(delayed.operation(), delayed.context());
- continue;
+ // Ready for action: remove from queue and run unless an operation was already rejected.
+ if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) {
+ if (action.test(delayed.operation(), delayed.context())) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ else { // If an operation is rejected, handle no more this run, and wait a short while before retrying.
+ waitUntil = clock.instant().plus(Duration.ofMillis(10));
+ rejected = true;
+ }
}
// Not yet ready for action: keep time to wake up again.
waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
index 7d7cda826b1..b57a0478ec2 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@@ -331,18 +332,31 @@ public class DocumentOperationExecutorTest {
AtomicLong counter1 = new AtomicLong(0);
AtomicLong counter2 = new AtomicLong(0);
AtomicLong counter3 = new AtomicLong(0);
+ AtomicBoolean throttle = new AtomicBoolean(true);
OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet());
OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet());
OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet());
- DelayQueue queue = new DelayQueue(3, (operation, context) -> context.success(Optional.empty()), Duration.ofMillis(3), clock);
+ DelayQueue queue = new DelayQueue(3,
+ (operation, context) -> {
+ if (throttle.get())
+ return false;
+
+ context.success(Optional.empty());
+ return true;
+ },
+ Duration.ofMillis(30),
+ clock);
synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts.
- // Add three operations — the first shall be handled by the queue, the second by an external called, and the third during shutdown.
+ // Add three operations:
+ //  the first shall be handled by the queue on second attempt,
+ // the second by an external call,and
+ // the third during shutdown — added later.
assertTrue(queue.add(nullOperation, context1));
- clock.advance(Duration.ofMillis(2));
+ clock.advance(Duration.ofMillis(20));
assertTrue(queue.add(nullOperation, context2));
assertTrue(queue.add(nullOperation, context3));
- assertFalse(queue.add(nullOperation, context3));
+ assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3));
assertEquals(3, queue.size());
assertEquals(0, counter1.get());
assertEquals(0, counter2.get());
@@ -355,8 +369,16 @@ public class DocumentOperationExecutorTest {
assertEquals(0, counter3.get());
assertEquals(3, queue.size());
- clock.advance(Duration.ofMillis(2));
- synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, handling first and evicting second entry.
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry.
+ assertEquals(0, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(2, queue.size());
+
+ throttle.set(false);
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry.
assertEquals(1, counter1.get());
assertEquals(-1, counter2.get());
assertEquals(0, counter3.get());