summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-08-03 10:46:02 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-08-03 10:46:02 +0000
commitbeb82e30b32d2210686edf37b34c3ecca21a412c (patch)
treef372d08150a654d7117b822725260702d4a1d9fa /vespa-http-client
parentcb7bacfafde63f3464e962d21b4619741ae7f942 (diff)
Change default poll frequency when idle form 1000hz to 10hz.
The cost of a poll is huge as it implies sending an http post. Synchrous usage might want to reduce this.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java20
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java3
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java13
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java16
4 files changed, 39 insertions, 13 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
index 1accbd51ac7..9a67411192a 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java
@@ -47,6 +47,7 @@ public final class ConnectionParams {
private boolean printTraceToStdErr = true;
private boolean useTlsConfigFromEnvironment = false;
private Duration connectionTimeToLive = Duration.ofSeconds(15);
+ private double idlePollFrequency = 10;
private Path privateKey;
private Path certificate;
private Path caCertificates;
@@ -258,6 +259,15 @@ public final class ConnectionParams {
return this;
}
+ /**
+ * Set what frequency to poll for async responses. Default is 10 (every 0.1s)
+ * If latency is important, or using it in a synchronous way (which is not recommended as throughput is priority),
+ * you can try increasing the frequency. Note that this will incur significantly higher cpu and bandwidth usage.
+ */
+ public void setIdlePollFrequency(double idlePollFrequency) {
+ this.idlePollFrequency = idlePollFrequency;
+ }
+
public ConnectionParams build() {
return new ConnectionParams(
sslContext,
@@ -278,7 +288,8 @@ public final class ConnectionParams {
traceEveryXOperation,
printTraceToStdErr,
useTlsConfigFromEnvironment,
- connectionTimeToLive);
+ connectionTimeToLive,
+ idlePollFrequency);
}
public int getNumPersistentConnectionsPerEndpoint() {
@@ -349,6 +360,7 @@ public final class ConnectionParams {
private final boolean printTraceToStdErr;
private final boolean useTlsConfigFromEnvironment;
private final Duration connectionTimeToLive;
+ private final double idlePollFrequency;
private ConnectionParams(
SSLContext sslContext,
@@ -367,7 +379,8 @@ public final class ConnectionParams {
int traceEveryXOperation,
boolean printTraceToStdErr,
boolean useTlsConfigFromEnvironment,
- Duration connectionTimeToLive) {
+ Duration connectionTimeToLive,
+ double idlePollFrequency) {
this.sslContext = sslContext;
this.privateKey = privateKey;
this.certificate = certificate;
@@ -387,6 +400,7 @@ public final class ConnectionParams {
this.traceLevel = traceLevel;
this.traceEveryXOperation = traceEveryXOperation;
this.printTraceToStdErr = printTraceToStdErr;
+ this.idlePollFrequency = idlePollFrequency;
}
@JsonIgnore
@@ -455,6 +469,8 @@ public final class ConnectionParams {
return connectionTimeToLive;
}
+ public double getIdlePollFrequency() { return idlePollFrequency; }
+
/**
* A header provider that provides a header value. {@link #getHeaderValue()} is called each time a new HTTP request
* is constructed by {@link com.yahoo.vespa.http.client.FeedClient}.
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 98755320d74..059542e56f1 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -83,7 +83,8 @@ public class ClusterConnection implements AutoCloseable {
maxInFlightPerSession,
feedParams.getLocalQueueTimeOut(),
documentQueue,
- feedParams.getMaxSleepTimeMs());
+ feedParams.getMaxSleepTimeMs(),
+ connectionParams.getIdlePollFrequency() );
ioThreads.add(ioThread);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 44799e598b0..ae6ae0ca5c9 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -44,6 +44,7 @@ class IOThread implements Runnable, AutoCloseable {
private final int maxInFlightRequests;
private final long localQueueTimeOut;
private final GatewayThrottler gatewayThrottler;
+ private final long pollIntervalUS;
private final Random random = new Random();
private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
@@ -65,7 +66,8 @@ class IOThread implements Runnable, AutoCloseable {
int maxInFlightRequests,
long localQueueTimeOut,
DocumentQueue documentQueue,
- long maxSleepTimeMs) {
+ long maxSleepTimeMs,
+ double idlePollFrequency) {
this.documentQueue = documentQueue;
this.endpoint = client.getEndpoint();
this.client = client;
@@ -74,6 +76,9 @@ class IOThread implements Runnable, AutoCloseable {
this.maxChunkSizeBytes = maxChunkSizeBytes;
this.maxInFlightRequests = maxInFlightRequests;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
+ //Ensure that pollInterval is in the range [1us, 10s]
+ this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency)));
+
this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
@@ -278,13 +283,13 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException {
+ private ProcessResponse pullAndProcessData(long maxWaitTimeUS) throws ServerResponseException, IOException {
int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
? new ArrayList<>() // The queue is full, will not send more documents
- : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS);
+ : getNextDocsForFeeding(maxWaitTimeUS, TimeUnit.MICROSECONDS);
if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
//we have no unfinished business with the server now.
@@ -349,7 +354,7 @@ class IOThread implements Runnable, AutoCloseable {
return ThreadState.SESSION_SYNCED;
case SESSION_SYNCED:
try {
- ProcessResponse processResponse = pullAndProcessData(1);
+ ProcessResponse processResponse = pullAndProcessData(pollIntervalUS);
gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
}
catch (ServerResponseException ser) {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 4fb66082cf4..e81638ded1c 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -85,6 +85,10 @@ public class IOThreadTest {
}).when(endpointResultQueue).resultReceived(any(), eq(0));
}
+ private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) {
+ return new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, maxInFlightRequests, localQueueTimeOut, documentQueue, 0, 10);
+ }
+
@Test
public void singleDocumentSuccess() throws Exception {
when(apacheGatewayConnection.connect()).thenReturn(true);
@@ -92,7 +96,7 @@ public class IOThreadTest {
(docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
when(apacheGatewayConnection.writeOperations(any())).thenReturn(serverResponse);
setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10000, 10000)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -103,7 +107,7 @@ public class IOThreadTest {
when(apacheGatewayConnection.connect()).thenReturn(true);
when(apacheGatewayConnection.writeOperations(any())).thenThrow(new IOException(exceptionMessage));
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10000, 10000)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -120,7 +124,7 @@ public class IOThreadTest {
latch = new CountDownLatch(2);
setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10000, 10000)) {
ioThread.post(doc1);
ioThread.post(doc2);
assert (latch.await(120, TimeUnit.SECONDS));
@@ -136,7 +140,7 @@ public class IOThreadTest {
.thenReturn(serverResponse);
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
"java.lang.Exception: Not sending document operation, timed out in queue after");
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10, 10)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -151,7 +155,7 @@ public class IOThreadTest {
doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10, 10)) {
ioThread.post(doc1);
FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
assertThat(reportedException, instanceOf(FeedProtocolException.class));
@@ -172,7 +176,7 @@ public class IOThreadTest {
doThrow(cause).when(apacheGatewayConnection).handshake();
Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
- try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = createIOThread(10, 10)) {
ioThread.post(doc1);
FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
assertThat(reportedException, instanceOf(FeedConnectException.class));