diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-08-31 23:33:24 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-08-31 23:33:24 +0200 |
commit | 95edcffea67641096f2c4f09343372592952eebd (patch) | |
tree | 9df9caa5ac517258e5ab1d49651af1362d48f864 /vespa-http-client/src | |
parent | 924ba9becee4f608db610f402422efd71ab6073c (diff) |
Drain old connections with exponential dropoff due to reduce cost
Diffstat (limited to 'vespa-http-client/src')
2 files changed, 37 insertions, 28 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 2417208fba3..8f052a1b3f6 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -455,8 +455,14 @@ class IOThread implements Runnable, AutoCloseable { for (Iterator<GatewayConnection> i = oldConnections.iterator(); i.hasNext(); ) { GatewayConnection connection = i.next(); if (closingTime(connection).isBefore(clock.instant())) { - connection.close(); - i.remove(); + try { + processResponse(connection.poll()); + connection.close(); + i.remove(); + } + catch (Exception e) { + // Old connection; best effort + } } else if (timeToPoll(connection)) { try { @@ -476,14 +482,13 @@ class IOThread implements Runnable, AutoCloseable { private boolean timeToPoll(GatewayConnection connection) { if (connection.lastPollTime() == null) return true; - // Poll less the closer the connection comes to closing time - double newness = ( closingTime(connection).toEpochMilli() - clock.millis() ) / - (double)localQueueTimeOut.toMillis(); - if (newness < 0) return true; // connection retired prematurely - if (newness > 1) return false; // closing time reached - Duration pollInterval = Duration.ofMillis(pollIntervalUS / 1000 + - (long)((1 - newness) * ( maxOldConnectionPollInterval.toMillis() - pollIntervalUS / 1000))); - return connection.lastPollTime().plus(pollInterval).isBefore(clock.instant()); + // Exponential (2^x) dropoff: + double unit = pollIntervalUS / 1000.0; + double x = ( clock.millis() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + double lastX = ( connection.lastPollTime().toEpochMilli() - connection.connectionTime().plus(connectionTimeToLive).toEpochMilli() ) / unit; + + double currentDelayDoublings = Math.log(lastX)/Math.log(2); + return (x - lastX) > Math.pow(2, currentDelayDoublings); } public static class ConnectionStats { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java index 615fa22a6cf..7bac8f9cd9d 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/NewIOThreadTest.java @@ -66,32 +66,36 @@ public class NewIOThreadTest { tester.send("doc1"); tester.tick(1); - tester.clock().advance(Duration.ofSeconds(20)); // Default connection ttl is 15 + tester.clock().advance(Duration.ofSeconds(16)); // Default connection ttl is 15 tester.tick(3); assertEquals(1, ioThread.oldConnections().size()); assertEquals(firstConnection, ioThread.oldConnections().get(0)); assertNotSame(firstConnection, ioThread.currentConnection()); - assertEquals(20, firstConnection.lastPollTime().toEpochMilli() / 1000); - - // Check old connection poll pattern (linear backoff) - assertLastPollTimeWhenAdvancing(21, 1, firstConnection, tester); + assertEquals(16, firstConnection.lastPollTime().toEpochMilli() / 1000); + + // Check old connection poll pattern (exponential backoff) + assertLastPollTimeWhenAdvancing(16, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(18, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(23, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(24, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(26, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(28, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(22, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); + assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); assertLastPollTimeWhenAdvancing(30, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(32, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(34, 1, firstConnection, tester); - assertLastPollTimeWhenAdvancing(37, 1, firstConnection, tester); tester.clock().advance(Duration.ofSeconds(200)); tester.tick(1); |