diff options
author | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 15:18:24 -0700 |
---|---|---|
committer | Jon Bratseth <bratseth@verizonmedia.com> | 2019-07-03 15:18:24 -0700 |
commit | 27c50d4fbe892774a22f84a5b42e41bd174642e6 (patch) | |
tree | 1c1f941056931df33e817beb2de754145869e1ba /vespa-http-client | |
parent | 6977ee22909e2fff5438602390a0abaddc040210 (diff) |
Block 1 ms, not 100 ms between cycles
Diffstat (limited to 'vespa-http-client')
6 files changed, 59 insertions, 30 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java index 008f3b63a89..fff0aa910d5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java @@ -13,13 +13,9 @@ import java.util.concurrent.TimeUnit; */ public final class FeedParams { - public boolean getDenyIfBusyV3() { - return denyIfBusyV3; - } + public boolean getDenyIfBusyV3() { return denyIfBusyV3; } - public long getMaxSleepTimeMs() { - return maxSleepTimeMs; - } + public long getMaxSleepTimeMs() { return maxSleepTimeMs; } public boolean getSilentUpgrade() { return silentUpgrade; } @@ -36,6 +32,7 @@ public final class FeedParams { * Mutable class used to instantiate a {@link FeedParams}. */ public static final class Builder { + private DataFormat dataFormat = DataFormat.JSON_UTF8; private long serverTimeout = TimeUnit.SECONDS.toMillis(180); private long clientTimeout = TimeUnit.SECONDS.toMillis(20); @@ -57,7 +54,7 @@ public final class FeedParams { * @return this, for chaining */ @Beta - public Builder withSilentUpgrade(boolean silentUpgrade) { + public Builder setSilentUpgrade(boolean silentUpgrade) { this.silentUpgrade = silentUpgrade; return this; } @@ -165,6 +162,7 @@ public final class FeedParams { /** * Sets the maximum number of operations to be in-flight. + * * @param maxInFlightRequests max number of operations. * @return this, for chaining */ @@ -246,11 +244,14 @@ public final class FeedParams { return maxChunkSizeBytes; } - public int getmaxInFlightRequests() { + public int getMaxInFlightRequests() { return maxInFlightRequests; } + } + // NOTE! See toBuilder at the end of this class if you add fields here + private final DataFormat dataFormat; private final long serverTimeoutMillis; private final long clientTimeoutMillis; @@ -263,7 +264,6 @@ public final class FeedParams { private final long maxSleepTimeMs; private final boolean silentUpgrade; - private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route, int maxChunkSizeBytes, final int maxInFlightRequests, long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs, @@ -319,4 +319,20 @@ public final class FeedParams { return localQueueTimeOut; } + /** Returns a builder initialized to the values of this */ + public FeedParams.Builder toBuilder() { + Builder b = new Builder(); + b.setDataFormat(dataFormat); + b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS); + b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS); + b.setRoute(route); + b.setMaxChunkSizeBytes(maxChunkSizeBytes); + b.setMaxInFlightRequests(maxInFlightRequests); + b.setPriority(priority); + b.setDenyIfBusyV3(denyIfBusyV3); + b.setMaxSleepTimeMs(maxSleepTimeMs); + b.setSilentUpgrade(silentUpgrade); + return b; + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java index 48fd21e2b1f..4e1406ab966 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java @@ -133,6 +133,8 @@ public final class SessionParams { } } + // NOTE! See toBuilder at the end of this class if you add fields here + private final List<Cluster> clusters; private final FeedParams feedParams; private final ConnectionParams connectionParams; @@ -179,4 +181,15 @@ public final class SessionParams { return errorReport; } + public Builder toBuilder() { + Builder b = new Builder(); + clusters.forEach(c -> b.addCluster(c)); + b.setFeedParams(feedParams); + b.setConnectionParams(connectionParams); + b.setClientQueueSize(clientQueueSize); + b.setErrorReporter(errorReport); + b.setThrottlerMinSize(throttlerMinSize); + return b; + } + } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 560bbd536e5..6e1f3419e8e 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -70,16 +70,15 @@ public class ClusterConnection implements AutoCloseable { if (documentQueue == null) { documentQueue = new DocumentQueue(clientQueueSizePerCluster); } - IOThread ioThread = new IOThread( - operationProcessor.getIoThreadGroup(), - endpointResultQueue, - gatewayConnection, - clusterId, - feedParams.getMaxChunkSizeBytes(), - maxInFlightPerSession, - feedParams.getLocalQueueTimeOut(), - documentQueue, - feedParams.getMaxSleepTimeMs()); + IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(), + endpointResultQueue, + gatewayConnection, + clusterId, + feedParams.getMaxChunkSizeBytes(), + maxInFlightPerSession, + feedParams.getLocalQueueTimeOut(), + documentQueue, + feedParams.getMaxSleepTimeMs()); ioThreads.add(ioThread); } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index c72a313a4b7..8ec4f6cb7f4 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -53,7 +53,7 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger docsReceivedCounter = new AtomicInteger(0); private final AtomicInteger statusReceivedCounter = new AtomicInteger(0); private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0); - private final AtomicInteger successfullHandshakes = new AtomicInteger(0); + private final AtomicInteger successfulHandshakes = new AtomicInteger(0); private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); IOThread(ThreadGroup ioThreadGroup, @@ -131,7 +131,7 @@ class IOThread implements Runnable, AutoCloseable { docsReceivedCounter.get(), statusReceivedCounter.get(), pendingDocumentStatusCount.get(), - successfullHandshakes.get(), + successfulHandshakes.get(), lastGatewayProcessTimeMillis.get()); } @@ -175,7 +175,7 @@ class IOThread implements Runnable, AutoCloseable { } - List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) { + List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) { List<Document> docsForSendChunk = new ArrayList<>(); int chunkSizeBytes = 0; try { @@ -269,14 +269,14 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException { + private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException { int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); - List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) + List<Document> nextDocsForFeeding = + (pendingResultQueueSize > maxInFlightRequests) ? new ArrayList<>() // The queue is full, will not send more documents. - : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS); - + : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS); if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { //we have no unfinished business with the server now. @@ -285,6 +285,7 @@ class IOThread implements Runnable, AutoCloseable { } log.finest("Awaiting " + pendingResultQueueSize + " results."); ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding); + if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) { try { // Max outstanding document operations, no more results on server side, wait a bit @@ -316,7 +317,7 @@ class IOThread implements Runnable, AutoCloseable { case CONNECTED: try { client.handshake(); - successfullHandshakes.getAndIncrement(); + successfulHandshakes.getAndIncrement(); } catch (ServerResponseException ser) { executeProblemsCounter.incrementAndGet(); log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser)); @@ -334,7 +335,7 @@ class IOThread implements Runnable, AutoCloseable { return ThreadState.SESSION_SYNCED; case SESSION_SYNCED: try { - ProcessResponse processResponse = pullAndProcessData(100); + ProcessResponse processResponse = pullAndProcessData(1); gatewayThrottler.handleCall(processResponse.transitiveErrorCount); } catch (ServerResponseException ser) { diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java index a2d5b18999e..388c71087ec 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java @@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client; import com.yahoo.vespa.http.client.config.Cluster; import com.yahoo.vespa.http.client.config.ConnectionParams; import com.yahoo.vespa.http.client.config.Endpoint; +import com.yahoo.vespa.http.client.config.FeedParams; import com.yahoo.vespa.http.client.config.SessionParams; import org.junit.Test; @@ -36,7 +37,6 @@ public class SyncFeedClientTest { .build(); SyncFeedClient feedClient = new SyncFeedClient(sessionParams); - assertFeedSuccessful(feedClient); assertFeedSuccessful(feedClient); // ensure the client can be reused feedClient.close(); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index 3143282081b..5a4c6d05185 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -162,7 +162,7 @@ public class IOThreadTest { @Test public void requireThatEndpointConnectExceptionsArePropagated() - throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { + throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException { when(apacheGatewayConnection.connect()).thenReturn(true); String errorMessage = "generic error message"; IOException cause = new IOException(errorMessage); |