diff options
author | Vegard Sjonfjell <vegard@yahoo-inc.com> | 2016-10-05 15:34:15 +0200 |
---|---|---|
committer | Vegard Sjonfjell <vegard@yahoo-inc.com> | 2016-10-05 15:34:15 +0200 |
commit | d22ceb89608611124291c5c9e30f7f70bac8aa98 (patch) | |
tree | 960ff704f4fc0f0fdb9dcc776e87e957b420853b /vespa-http-client | |
parent | 23cd65bb2d4a25f2d52a70f573ce4a3e25ee6b8c (diff) | |
parent | e8b571ebc3eb2592f16ca546a65bf318ba0f4df7 (diff) |
Merge branch 'master' into voffeloff/move-jsontesthelper
Diffstat (limited to 'vespa-http-client')
4 files changed, 43 insertions, 24 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java index e6ff8593e71..d7f001eff31 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java @@ -27,8 +27,9 @@ public class FeedClientImpl implements FeedClient { public FeedClientImpl( SessionParams sessionParams, ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor) { - this.closeTimeoutMs = sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + - sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS); + this.closeTimeoutMs = sessionParams.getConnectionParams().getMaxRetries() * ( + sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) + + sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS)); this.operationProcessor = new OperationProcessor( new IncompleteResultsThrottler( sessionParams.getThrottlerMinSize(), diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 199f3dcbaa8..414ae90dd27 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -114,8 +114,8 @@ public class ClusterConnection implements AutoCloseable { IOThread ioThread = ioThreads.get(hash % ioThreads.size()); try { ioThread.post(document); - } catch (InterruptedException e) { - throw new EndpointIOException(ioThread.getEndpoint(), "While sending", e); + } catch (Throwable t) { + throw new EndpointIOException(ioThread.getEndpoint(), "While sending", t); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 505039cd2d4..60324eda47a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -136,33 +136,18 @@ class IOThread implements Runnable, AutoCloseable { stopSignal.countDown(); log.finer("Closed called."); - try { - if (! running.await(2 * localQueueTimeOut, TimeUnit.MILLISECONDS)) { - log.info("Waited " + 2 * localQueueTimeOut - + " ms for queue to be empty, did not happen, interrupting thread."); - } - } catch (InterruptedException e) { - log.log(Level.INFO, "Interrupted while waiting for threads to finish sending.", e); - } - - // Make 5 attempts the next 30 secs to get results from previous operations. - for (int i = 0 ; i < 5; i++) { - int size = resultQueue.getPendingSize(); - if (size == 0) break; - log.info("We have outstanding operations (" + size +") , waiting for responses, iteraton: " + i + "."); + // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here. + int size = resultQueue.getPendingSize(); + if (size > 0) { + log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { processResponse(client.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); - break; - } - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - break; } } + try { client.close(); } finally { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index c87385ec2ce..0eb3fc12405 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -13,11 +13,16 @@ import org.junit.Test; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> @@ -367,4 +372,32 @@ public class OperationProcessorTest { assertThat(done.await(120, TimeUnit.SECONDS), is(true)); } + + @Test + public void testSendsResponseToQueuedDocumentOnClose() throws InterruptedException { + SessionParams sessionParams = new SessionParams.Builder() + .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("#$#")).build()) + .build(); + + ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class); + when(executor.awaitTermination(anyLong(), any())).thenReturn(true); + + CountDownLatch countDownLatch = new CountDownLatch(3); + + OperationProcessor operationProcessor = new OperationProcessor( + new IncompleteResultsThrottler(19, 19, null, null), + (docId, documentResult) -> { + countDownLatch.countDown(); + }, + sessionParams, executor); + + // Will fail due to bogus host name, but will be retried. + operationProcessor.sendDocument(doc1); + operationProcessor.sendDocument(doc2); + operationProcessor.sendDocument(doc3); + + // Will create fail results. + operationProcessor.close(); + countDownLatch.await(); + } } |