From 3f3eac749739ae2ae3070d2f94751f633ce8c2f8 Mon Sep 17 00:00:00 2001 From: Haakon Dybdahl Date: Mon, 26 Sep 2016 13:34:45 +0200 Subject: Improve time-out handling for close call. --- .../vespa/http/client/core/api/FeedClientImpl.java | 36 +++++++++++++++++----- .../http/client/core/api/FeedClientImplTest.java | 31 +++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index 64836ccb20d..e6ff8593e71 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -13,6 +13,7 @@ import java.nio.charset.CodingErrorAction; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Implementation of FeedClient. It is a thin layer on top of multiClusterHandler and multiClusterResultAggregator. @@ -21,10 +22,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; public class FeedClientImpl implements FeedClient { private final OperationProcessor operationProcessor; + private final long closeTimeoutMs; + private final long sleepTimeMs = 500; public FeedClientImpl( SessionParams sessionParams, ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor) { - + this.closeTimeoutMs = sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + + sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS); this.operationProcessor = new OperationProcessor( new IncompleteResultsThrottler( sessionParams.getThrottlerMinSize(), @@ -53,13 +57,15 @@ public class FeedClientImpl implements FeedClient { @Override public void close() { - Instant startTime = Instant.now(); - while (operationProcessor.getIncompleteResultQueueSize() > 0 && startTime.plusSeconds(30).isAfter(Instant.now())) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - break; + Instant lastResultReceived = Instant.now(); + long lastNumberOfResults = operationProcessor.getIncompleteResultQueueSize(); + + while (waitForOperations(lastResultReceived, lastNumberOfResults, sleepTimeMs, closeTimeoutMs)) { + long results = operationProcessor.getIncompleteResultQueueSize(); + if (results != lastNumberOfResults) { + lastResultReceived = Instant.now(); } + lastNumberOfResults = results; } operationProcessor.close(); } @@ -68,4 +74,20 @@ public class FeedClientImpl implements FeedClient { public String getStatsAsJson() { return operationProcessor.getStatsAsJson(); } + + // On return value true, wait more. Public for testing. + public static boolean waitForOperations(Instant lastResultReceived, long lastNumberOfResults, long sleepTimeMs, long closeTimeoutMs) { + if (lastNumberOfResults == 0) { + return false; + } + if (lastResultReceived.plusMillis(closeTimeoutMs).isBefore(Instant.now())) { + return false; + } + try { + Thread.sleep(sleepTimeMs); + } catch (InterruptedException e) { + return false; + } + return true; + } } diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java new file mode 100644 index 00000000000..79d66be9f97 --- /dev/null +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/api/FeedClientImplTest.java @@ -0,0 +1,31 @@ +package com.yahoo.vespa.http.client.core.api; + +import org.junit.Test; + +import java.time.Instant; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; + +/** + * @author dybis + */ +public class FeedClientImplTest { + + int sleepValueMillis = 1; + + @Test + public void testCloseWaitTimeOldTimestamp() { + assertThat(FeedClientImpl.waitForOperations(Instant.now().minusSeconds(1000), 1, sleepValueMillis, 10), is(false)); + } + + @Test + public void testCloseWaitTimeOutInFutureStillOperations() { + assertThat(FeedClientImpl.waitForOperations(Instant.now(), 1, sleepValueMillis, 2000), is(true)); + } + + @Test + public void testCloseWaitZeroOperations() { + assertThat(FeedClientImpl.waitForOperations(Instant.now(), 0, sleepValueMillis, 2000), is(false)); + } +} \ No newline at end of file -- cgit v1.2.3