diff options
author | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2016-10-05 12:28:26 +0200 |
---|---|---|
committer | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2016-10-05 12:28:26 +0200 |
commit | 75748253d231e9515de432bb3939c5c72e84eff9 (patch) | |
tree | 020f8f8f980c9df3073afbf07d2e326bedf0cd1e /vespa-http-client | |
parent | 1075f3f0bee109bb96f03b5a5cd50761a6fd098e (diff) |
Do less work on IOThread.close as it is sequential. Catch exception when document queue is closed on retries. Add a unit test.
Diffstat (limited to 'vespa-http-client')
3 files changed, 40 insertions, 22 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 199f3dcbaa8..414ae90dd27 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -114,8 +114,8 @@ public class ClusterConnection implements AutoCloseable { IOThread ioThread = ioThreads.get(hash % ioThreads.size()); try { ioThread.post(document); - } catch (InterruptedException e) { - throw new EndpointIOException(ioThread.getEndpoint(), "While sending", e); + } catch (Throwable t) { + throw new EndpointIOException(ioThread.getEndpoint(), "While sending", t); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 505039cd2d4..60324eda47a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -136,33 +136,18 @@ class IOThread implements Runnable, AutoCloseable { stopSignal.countDown(); log.finer("Closed called."); - try { - if (! running.await(2 * localQueueTimeOut, TimeUnit.MILLISECONDS)) { - log.info("Waited " + 2 * localQueueTimeOut - + " ms for queue to be empty, did not happen, interrupting thread."); - } - } catch (InterruptedException e) { - log.log(Level.INFO, "Interrupted while waiting for threads to finish sending.", e); - } - - // Make 5 attempts the next 30 secs to get results from previous operations. - for (int i = 0 ; i < 5; i++) { - int size = resultQueue.getPendingSize(); - if (size == 0) break; - log.info("We have outstanding operations (" + size +") , waiting for responses, iteraton: " + i + "."); + // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here. + int size = resultQueue.getPendingSize(); + if (size > 0) { + log.info("We have outstanding operations (" + size + ") , trying to fetch responses."); try { processResponse(client.drain()); } catch (Throwable e) { log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e); - break; - } - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - break; } } + try { client.close(); } finally { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java index c87385ec2ce..0eb3fc12405 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java @@ -13,11 +13,16 @@ import org.junit.Test; import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> @@ -367,4 +372,32 @@ public class OperationProcessorTest { assertThat(done.await(120, TimeUnit.SECONDS), is(true)); } + + @Test + public void testSendsResponseToQueuedDocumentOnClose() throws InterruptedException { + SessionParams sessionParams = new SessionParams.Builder() + .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("#$#")).build()) + .build(); + + ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class); + when(executor.awaitTermination(anyLong(), any())).thenReturn(true); + + CountDownLatch countDownLatch = new CountDownLatch(3); + + OperationProcessor operationProcessor = new OperationProcessor( + new IncompleteResultsThrottler(19, 19, null, null), + (docId, documentResult) -> { + countDownLatch.countDown(); + }, + sessionParams, executor); + + // Will fail due to bogus host name, but will be retried. + operationProcessor.sendDocument(doc1); + operationProcessor.sendDocument(doc2); + operationProcessor.sendDocument(doc3); + + // Will create fail results. + operationProcessor.close(); + countDownLatch.await(); + } } |