diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:24:23 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-30 15:27:19 +0200 |
commit | c3bfa8316ac359dffce121f1900e5bf63ec11cf0 (patch) | |
tree | a57629208114a5dc082b521e353c563beab94f79 /vespa-http-client/src/test/java | |
parent | ff205ce5e2eccafeb0957007fb2671f1488e57c3 (diff) |
Close connection early if no inflight operations
Diffstat (limited to 'vespa-http-client/src/test/java')
2 files changed, 34 insertions, 7 deletions
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java index da82079e992..55961e4aa0e 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/EndpointResultQueueTest.java @@ -7,6 +7,7 @@ import com.yahoo.vespa.http.client.core.EndpointResult; import com.yahoo.vespa.http.client.core.operationProcessor.OperationProcessor; import org.junit.Test; +import java.time.Clock; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -28,6 +29,7 @@ public class EndpointResultQueueTest { public void testBasics() { Endpoint endpoint = Endpoint.create("a"); + GatewayConnection connection = new DryRunGatewayConnection(endpoint, Clock.systemUTC()); OperationProcessor mockAggregator = mock(OperationProcessor.class); final AtomicInteger resultCount = new AtomicInteger(0); @@ -39,11 +41,11 @@ public class EndpointResultQueueTest { EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L * 1000L); - q.operationSent("op1"); + q.operationSent("op1", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op2"); + q.operationSent("op2", connection); assertThat(q.getPendingSize(), is(2)); - q.operationSent("op3"); + q.operationSent("op3", connection); assertThat(q.getPendingSize(), is(3)); q.resultReceived(new EndpointResult("op1", new Result.Detail(endpoint)), 0); assertThat(q.getPendingSize(), is(2)); @@ -58,9 +60,9 @@ public class EndpointResultQueueTest { assertThat(resultCount.get(), is(5)); - q.operationSent("op4"); + q.operationSent("op4", connection); assertThat(q.getPendingSize(), is(1)); - q.operationSent("op5"); + q.operationSent("op5", connection); assertThat(q.getPendingSize(), is(2)); q.failPending(new RuntimeException()); @@ -72,7 +74,6 @@ public class EndpointResultQueueTest { @Test public void testTimeout() throws InterruptedException { Endpoint endpoint = Endpoint.create("a"); - OperationProcessor mockAggregator = mock(OperationProcessor.class); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocationOnMock -> { @@ -81,7 +82,7 @@ public class EndpointResultQueueTest { }).when(mockAggregator).resultReceived(any(), eq(0)); EndpointResultQueue q = new EndpointResultQueue( mockAggregator, endpoint, 0, new ScheduledThreadPoolExecutor(1), 100L); - q.operationSent("1234"); + q.operationSent("1234", new DryRunGatewayConnection(endpoint, Clock.systemUTC())); assert(latch.await(120, TimeUnit.SECONDS)); } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index ef90d6853b1..bddfecdfe65 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -137,6 +137,32 @@ public class IOThreadTest { assertEquals("Old connection is eventually removed", 0, ioThread.oldConnections().size()); } + @Test + public void old_connections_are_closed_early_if_no_inflight_operations() { + OperationProcessorTester tester = new OperationProcessorTester(); + tester.tick(3); + + IOThread ioThread = tester.getSingleIOThread(); + DryRunGatewayConnection firstConnection = (DryRunGatewayConnection)ioThread.currentConnection(); + assertEquals(0, ioThread.oldConnections().size()); + + firstConnection.hold(true); // do not send result for doc1 in next http response + tester.send("doc1"); + tester.tick(1); + tester.clock().advance(Duration.ofSeconds(31)); // Default connection TTL + 1 + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + + firstConnection.hold(false); // send result for both doc1 and doc2 in next http response + tester.send("doc2"); + tester.tick(1); + assertEquals(1, ioThread.oldConnections().size()); + tester.clock().advance(Duration.ofSeconds(2)); + tester.tick(3); + assertLastPollTimeWhenAdvancing(33, 1, firstConnection, tester); + assertEquals(0, ioThread.oldConnections().size()); + } + private void assertLastPollTimeWhenAdvancing(int lastPollTimeSeconds, int advanceSeconds, DryRunGatewayConnection connection, |