diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-03 10:46:02 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-08-03 10:46:02 +0000 |
commit | beb82e30b32d2210686edf37b34c3ecca21a412c (patch) | |
tree | f372d08150a654d7117b822725260702d4a1d9fa /vespa-http-client | |
parent | cb7bacfafde63f3464e962d21b4619741ae7f942 (diff) |
Change default poll frequency when idle form 1000hz to 10hz.
The cost of a poll is huge as it implies sending an http post.
Synchrous usage might want to reduce this.
Diffstat (limited to 'vespa-http-client')
4 files changed, 39 insertions, 13 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java index 1accbd51ac7..9a67411192a 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/ConnectionParams.java @@ -47,6 +47,7 @@ public final class ConnectionParams { private boolean printTraceToStdErr = true; private boolean useTlsConfigFromEnvironment = false; private Duration connectionTimeToLive = Duration.ofSeconds(15); + private double idlePollFrequency = 10; private Path privateKey; private Path certificate; private Path caCertificates; @@ -258,6 +259,15 @@ public final class ConnectionParams { return this; } + /** + * Set what frequency to poll for async responses. Default is 10 (every 0.1s) + * If latency is important, or using it in a synchronous way (which is not recommended as throughput is priority), + * you can try increasing the frequency. Note that this will incur significantly higher cpu and bandwidth usage. + */ + public void setIdlePollFrequency(double idlePollFrequency) { + this.idlePollFrequency = idlePollFrequency; + } + public ConnectionParams build() { return new ConnectionParams( sslContext, @@ -278,7 +288,8 @@ public final class ConnectionParams { traceEveryXOperation, printTraceToStdErr, useTlsConfigFromEnvironment, - connectionTimeToLive); + connectionTimeToLive, + idlePollFrequency); } public int getNumPersistentConnectionsPerEndpoint() { @@ -349,6 +360,7 @@ public final class ConnectionParams { private final boolean printTraceToStdErr; private final boolean useTlsConfigFromEnvironment; private final Duration connectionTimeToLive; + private final double idlePollFrequency; private ConnectionParams( SSLContext sslContext, @@ -367,7 +379,8 @@ public final class ConnectionParams { int traceEveryXOperation, boolean printTraceToStdErr, boolean useTlsConfigFromEnvironment, - Duration connectionTimeToLive) { + Duration connectionTimeToLive, + double idlePollFrequency) { this.sslContext = sslContext; this.privateKey = privateKey; this.certificate = certificate; @@ -387,6 +400,7 @@ public final class ConnectionParams { this.traceLevel = traceLevel; this.traceEveryXOperation = traceEveryXOperation; this.printTraceToStdErr = printTraceToStdErr; + this.idlePollFrequency = idlePollFrequency; } @JsonIgnore @@ -455,6 +469,8 @@ public final class ConnectionParams { return connectionTimeToLive; } + public double getIdlePollFrequency() { return idlePollFrequency; } + /** * A header provider that provides a header value. {@link #getHeaderValue()} is called each time a new HTTP request * is constructed by {@link com.yahoo.vespa.http.client.FeedClient}. diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 98755320d74..059542e56f1 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -83,7 +83,8 @@ public class ClusterConnection implements AutoCloseable { maxInFlightPerSession, feedParams.getLocalQueueTimeOut(), documentQueue, - feedParams.getMaxSleepTimeMs()); + feedParams.getMaxSleepTimeMs(), + connectionParams.getIdlePollFrequency() ); ioThreads.add(ioThread); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 44799e598b0..ae6ae0ca5c9 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -44,6 +44,7 @@ class IOThread implements Runnable, AutoCloseable { private final int maxInFlightRequests; private final long localQueueTimeOut; private final GatewayThrottler gatewayThrottler; + private final long pollIntervalUS; private final Random random = new Random(); private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; @@ -65,7 +66,8 @@ class IOThread implements Runnable, AutoCloseable { int maxInFlightRequests, long localQueueTimeOut, DocumentQueue documentQueue, - long maxSleepTimeMs) { + long maxSleepTimeMs, + double idlePollFrequency) { this.documentQueue = documentQueue; this.endpoint = client.getEndpoint(); this.client = client; @@ -74,6 +76,9 @@ class IOThread implements Runnable, AutoCloseable { this.maxChunkSizeBytes = maxChunkSizeBytes; this.maxInFlightRequests = maxInFlightRequests; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); + //Ensure that pollInterval is in the range [1us, 10s] + this.pollIntervalUS = Math.max(1, (long)(1000000.0/Math.max(0.1, idlePollFrequency))); + this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; @@ -278,13 +283,13 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException { + private ProcessResponse pullAndProcessData(long maxWaitTimeUS) throws ServerResponseException, IOException { int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) ? new ArrayList<>() // The queue is full, will not send more documents - : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS); + : getNextDocsForFeeding(maxWaitTimeUS, TimeUnit.MICROSECONDS); if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { //we have no unfinished business with the server now. @@ -349,7 +354,7 @@ class IOThread implements Runnable, AutoCloseable { return ThreadState.SESSION_SYNCED; case SESSION_SYNCED: try { - ProcessResponse processResponse = pullAndProcessData(1); + ProcessResponse processResponse = pullAndProcessData(pollIntervalUS); gatewayThrottler.handleCall(processResponse.transitiveErrorCount); } catch (ServerResponseException ser) { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 4fb66082cf4..e81638ded1c 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -85,6 +85,10 @@ public class IOThreadTest { }).when(endpointResultQueue).resultReceived(any(), eq(0)); } + private IOThread createIOThread(int maxInFlightRequests, long localQueueTimeOut) { + return new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, maxInFlightRequests, localQueueTimeOut, documentQueue, 0, 10); + } + @Test public void singleDocumentSuccess() throws Exception { when(apacheGatewayConnection.connect()).thenReturn(true); @@ -92,7 +96,7 @@ public class IOThreadTest { (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); when(apacheGatewayConnection.writeOperations(any())).thenReturn(serverResponse); setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10000, 10000)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -103,7 +107,7 @@ public class IOThreadTest { when(apacheGatewayConnection.connect()).thenReturn(true); when(apacheGatewayConnection.writeOperations(any())).thenThrow(new IOException(exceptionMessage)); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10000, 10000)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -120,7 +124,7 @@ public class IOThreadTest { latch = new CountDownLatch(2); setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10000, 10000)) { ioThread.post(doc1); ioThread.post(doc2); assert (latch.await(120, TimeUnit.SECONDS)); @@ -136,7 +140,7 @@ public class IOThreadTest { .thenReturn(serverResponse); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, "java.lang.Exception: Not sending document operation, timed out in queue after"); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10, 10)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -151,7 +155,7 @@ public class IOThreadTest { doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake(); Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10, 10)) { ioThread.post(doc1); FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); assertThat(reportedException, instanceOf(FeedProtocolException.class)); @@ -172,7 +176,7 @@ public class IOThreadTest { doThrow(cause).when(apacheGatewayConnection).handshake(); Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = createIOThread(10, 10)) { ioThread.post(doc1); FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); assertThat(reportedException, instanceOf(FeedConnectException.class)); |