diff options
author | jonmv <venstad@gmail.com> | 2023-01-23 19:27:15 +0100 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-01-23 19:27:15 +0100 |
commit | 8a244bdddd7aa174b5215a14d2ffe7d8e699868a (patch) | |
tree | 9367dcff2cf95a1fd6c3bdc4e77df96d68194627 /controller-server | |
parent | 65edcb0eac7f77b7ca58cf92dd0ec50d332c81bb (diff) |
Discard in dispatcher instead, which knows more about state
This also avoid discarding "high" priority requests immediately upon enqueing them.
Diffstat (limited to 'controller-server')
3 files changed, 50 insertions, 15 deletions
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceForwarder.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceForwarder.java index 481258a295e..d0d5ddf55ef 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceForwarder.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceForwarder.java @@ -30,12 +30,6 @@ import java.util.logging.Logger; */ public class NameServiceForwarder { - /** - * The number of {@link NameServiceRequest}s we allow to be queued. When the queue overflows, the first requests - * are dropped in a FIFO order until the queue shrinks below this capacity. - */ - private static final int QUEUE_CAPACITY = 400; - private static final Logger log = Logger.getLogger(NameServiceForwarder.class.getName()); private final CuratorDb db; @@ -87,13 +81,12 @@ public class NameServiceForwarder { try (Mutex lock = db.lockNameServiceQueue()) { NameServiceQueue queue = db.readNameServiceQueue(); var queued = queue.requests().size(); - if (queued >= QUEUE_CAPACITY) { - log.log(Level.WARNING, "Queue is at capacity (size: " + queued + "), dropping older " + - "requests. This likely means that the name service is not successfully " + - "executing requests"); + if (queued > NameServiceQueue.QUEUE_CAPACITY) { + log.log(Level.WARNING, "Queue is above capacity (size: " + queued + "), failing requests will be dropped. " + + "This likely means that the name service is not successfully executing requests"); } log.log(Level.FINE, () -> "Queueing name service request: " + request); - db.writeNameServiceQueue(queue.with(request, priority).last(QUEUE_CAPACITY)); + db.writeNameServiceQueue(queue.with(request, priority)); } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueue.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueue.java index dbcfa6705ec..ab22b551b2b 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueue.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueue.java @@ -10,7 +10,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Objects; import java.util.Optional; -import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,6 +29,13 @@ public record NameServiceQueue(List<NameServiceRequest> requests) { public static final NameServiceQueue EMPTY = new NameServiceQueue(List.of()); + /** + * The number of {@link NameServiceRequest}s we allow to be queued. When the queue overflows, failing requests + * are dropped in a FIFO order until the queue shrinks below this capacity. If that is not enough, the oldest + * requests will also be dropped, as needed. + */ + static final int QUEUE_CAPACITY = 400; + private static final Logger log = Logger.getLogger(NameServiceQueue.class.getName()); /** DO NOT USE. Public for serialization purposes */ @@ -84,8 +90,11 @@ public record NameServiceQueue(List<NameServiceRequest> requests) { request.dispatchTo(nameService); } catch (Exception e) { + boolean dropFailingRequest = pending.size() > QUEUE_CAPACITY; log.log(Level.WARNING, "Failed to execute " + request + ": " + Exceptions.toMessageString(e) + - ", request will be moved backwards, and retried"); + ", request will " + (dropFailingRequest ? "be dropped, as queue is over capacity" + : "be moved backwards, and retried")); + if (dropFailingRequest) continue; // Move all requests with the same owner backwards as far as we can, i.e., to the back, or to the first owner-less request. Optional<TenantAndApplicationId> owner = request.owner(); @@ -103,7 +112,14 @@ public record NameServiceQueue(List<NameServiceRequest> requests) { pending.addAll(0, others); // then append requests owned by others before that again. } } - return new NameServiceQueue(pending); + + NameServiceQueue remaining = new NameServiceQueue(pending); + if (pending.size() > 2 * QUEUE_CAPACITY) { + log.log(Level.WARNING, "Queue has " + pending.size() + " entries, and must be emptying far too slowly; " + + "dropping the oldest entries past " + 2 * QUEUE_CAPACITY); + remaining = remaining.last(2 * QUEUE_CAPACITY); + } + return remaining; } @Override diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueueTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueueTest.java index 8536d78f843..6d555e6d455 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueueTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/dns/NameServiceQueueTest.java @@ -18,12 +18,14 @@ import org.junit.jupiter.api.Test; import java.util.ArrayDeque; import java.util.Deque; +import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -106,7 +108,12 @@ public class NameServiceQueueTest { void test_failing_requests() { Deque<Consumer<RecordName>> expectations = new ArrayDeque<>(); var nameService = new NameService() { - @Override public Record createRecord(Type type, RecordName name, RecordData data) { expectations.pop().accept(name); return null; } + @Override public Record createRecord(Type type, RecordName name, RecordData data) { + var expectation = expectations.poll(); + assertNotNull(expectation, "unexpected dispatch; add more expectations, or fix the bug!"); + expectation.accept(name); + return null; + } @Override public List<Record> createAlias(RecordName name, Set<AliasTarget> targets) { throw new UnsupportedOperationException(); } @Override public List<Record> createDirect(RecordName name, Set<DirectTarget> targets) { throw new UnsupportedOperationException(); } @Override public List<Record> createTxtRecords(RecordName name, List<RecordData> txtRecords) { throw new UnsupportedOperationException(); } @@ -176,6 +183,25 @@ public class NameServiceQueueTest { assertEquals(List.of(), new NameServiceQueue(base).dispatchTo(nameService, 100).requests()); assertEquals(0, expectations.size()); + + // Finally, let the queue fill past its capacity, and see that failed requests are simply dropped instead. + expectations.add(name -> { assertEquals(rec1.name(), name); throw exception; }); + expectations.add(name -> { assertEquals(rec2.name(), name); }); + expectations.add(name -> { assertEquals(rec1.name(), name); throw exception; }); + expectations.add(name -> { assertEquals(rec2.name(), name); }); + var full = new LinkedList<NameServiceRequest>(); + for (int i = 0; i < NameServiceQueue.QUEUE_CAPACITY; i++) { + full.add(req1); + full.add(req2); + } + assertEquals(full.subList(4, full.size()), + new NameServiceQueue(full).dispatchTo(nameService, 4).requests()); + + // However, if the queue is even fuller, at the end of a dispatch run, the oldest requests are discarded too. + full.add(req3); + full.add(req4); + assertEquals(full.subList(2, full.size()), + new NameServiceQueue(full).dispatchTo(nameService, 0).requests()); } } |