summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorVegard Sjonfjell <vegard@yahoo-inc.com>2016-10-05 15:34:15 +0200
committerVegard Sjonfjell <vegard@yahoo-inc.com>2016-10-05 15:34:15 +0200
commitd22ceb89608611124291c5c9e30f7f70bac8aa98 (patch)
tree960ff704f4fc0f0fdb9dcc776e87e957b420853b /vespa-http-client
parent23cd65bb2d4a25f2d52a70f573ce4a3e25ee6b8c (diff)
parente8b571ebc3eb2592f16ca546a65bf318ba0f4df7 (diff)
Merge branch 'master' into voffeloff/move-jsontesthelper
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java5
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java25
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java33
4 files changed, 43 insertions, 24 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
index e6ff8593e71..d7f001eff31 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/api/FeedClientImpl.java
@@ -27,8 +27,9 @@ public class FeedClientImpl implements FeedClient {
public FeedClientImpl(
SessionParams sessionParams, ResultCallback resultCallback, ScheduledThreadPoolExecutor timeoutExecutor) {
- this.closeTimeoutMs = sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
- sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS);
+ this.closeTimeoutMs = sessionParams.getConnectionParams().getMaxRetries() * (
+ sessionParams.getFeedParams().getServerTimeout(TimeUnit.MILLISECONDS) +
+ sessionParams.getFeedParams().getClientTimeout(TimeUnit.MILLISECONDS));
this.operationProcessor = new OperationProcessor(
new IncompleteResultsThrottler(
sessionParams.getThrottlerMinSize(),
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 199f3dcbaa8..414ae90dd27 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -114,8 +114,8 @@ public class ClusterConnection implements AutoCloseable {
IOThread ioThread = ioThreads.get(hash % ioThreads.size());
try {
ioThread.post(document);
- } catch (InterruptedException e) {
- throw new EndpointIOException(ioThread.getEndpoint(), "While sending", e);
+ } catch (Throwable t) {
+ throw new EndpointIOException(ioThread.getEndpoint(), "While sending", t);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 505039cd2d4..60324eda47a 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -136,33 +136,18 @@ class IOThread implements Runnable, AutoCloseable {
stopSignal.countDown();
log.finer("Closed called.");
- try {
- if (! running.await(2 * localQueueTimeOut, TimeUnit.MILLISECONDS)) {
- log.info("Waited " + 2 * localQueueTimeOut
- + " ms for queue to be empty, did not happen, interrupting thread.");
- }
- } catch (InterruptedException e) {
- log.log(Level.INFO, "Interrupted while waiting for threads to finish sending.", e);
- }
-
- // Make 5 attempts the next 30 secs to get results from previous operations.
- for (int i = 0 ; i < 5; i++) {
- int size = resultQueue.getPendingSize();
- if (size == 0) break;
- log.info("We have outstanding operations (" + size +") , waiting for responses, iteraton: " + i + ".");
+ // Make a last attempt to get results from previous operations, we have already waited quite a bit before getting here.
+ int size = resultQueue.getPendingSize();
+ if (size > 0) {
+ log.info("We have outstanding operations (" + size + ") , trying to fetch responses.");
try {
processResponse(client.drain());
} catch (Throwable e) {
log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", e);
- break;
- }
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e) {
- break;
}
}
+
try {
client.close();
} finally {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
index c87385ec2ce..0eb3fc12405 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessorTest.java
@@ -13,11 +13,16 @@ import org.junit.Test;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
@@ -367,4 +372,32 @@ public class OperationProcessorTest {
assertThat(done.await(120, TimeUnit.SECONDS), is(true));
}
+
+ @Test
+ public void testSendsResponseToQueuedDocumentOnClose() throws InterruptedException {
+ SessionParams sessionParams = new SessionParams.Builder()
+ .addCluster(new Cluster.Builder().addEndpoint(Endpoint.create("#$#")).build())
+ .build();
+
+ ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class);
+ when(executor.awaitTermination(anyLong(), any())).thenReturn(true);
+
+ CountDownLatch countDownLatch = new CountDownLatch(3);
+
+ OperationProcessor operationProcessor = new OperationProcessor(
+ new IncompleteResultsThrottler(19, 19, null, null),
+ (docId, documentResult) -> {
+ countDownLatch.countDown();
+ },
+ sessionParams, executor);
+
+ // Will fail due to bogus host name, but will be retried.
+ operationProcessor.sendDocument(doc1);
+ operationProcessor.sendDocument(doc2);
+ operationProcessor.sendDocument(doc3);
+
+ // Will create fail results.
+ operationProcessor.close();
+ countDownLatch.await();
+ }
}