summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@verizonmedia.com>2019-07-03 15:18:24 -0700
committerJon Bratseth <bratseth@verizonmedia.com>2019-07-03 15:18:24 -0700
commit27c50d4fbe892774a22f84a5b42e41bd174642e6 (patch)
tree1c1f941056931df33e817beb2de754145869e1ba /vespa-http-client
parent6977ee22909e2fff5438602390a0abaddc040210 (diff)
Block 1 ms, not 100 ms between cycles
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java34
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java19
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java19
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java2
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java2
6 files changed, 59 insertions, 30 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
index 008f3b63a89..fff0aa910d5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/FeedParams.java
@@ -13,13 +13,9 @@ import java.util.concurrent.TimeUnit;
*/
public final class FeedParams {
- public boolean getDenyIfBusyV3() {
- return denyIfBusyV3;
- }
+ public boolean getDenyIfBusyV3() { return denyIfBusyV3; }
- public long getMaxSleepTimeMs() {
- return maxSleepTimeMs;
- }
+ public long getMaxSleepTimeMs() { return maxSleepTimeMs; }
public boolean getSilentUpgrade() { return silentUpgrade; }
@@ -36,6 +32,7 @@ public final class FeedParams {
* Mutable class used to instantiate a {@link FeedParams}.
*/
public static final class Builder {
+
private DataFormat dataFormat = DataFormat.JSON_UTF8;
private long serverTimeout = TimeUnit.SECONDS.toMillis(180);
private long clientTimeout = TimeUnit.SECONDS.toMillis(20);
@@ -57,7 +54,7 @@ public final class FeedParams {
* @return this, for chaining
*/
@Beta
- public Builder withSilentUpgrade(boolean silentUpgrade) {
+ public Builder setSilentUpgrade(boolean silentUpgrade) {
this.silentUpgrade = silentUpgrade;
return this;
}
@@ -165,6 +162,7 @@ public final class FeedParams {
/**
* Sets the maximum number of operations to be in-flight.
+ *
* @param maxInFlightRequests max number of operations.
* @return this, for chaining
*/
@@ -246,11 +244,14 @@ public final class FeedParams {
return maxChunkSizeBytes;
}
- public int getmaxInFlightRequests() {
+ public int getMaxInFlightRequests() {
return maxInFlightRequests;
}
+
}
+ // NOTE! See toBuilder at the end of this class if you add fields here
+
private final DataFormat dataFormat;
private final long serverTimeoutMillis;
private final long clientTimeoutMillis;
@@ -263,7 +264,6 @@ public final class FeedParams {
private final long maxSleepTimeMs;
private final boolean silentUpgrade;
-
private FeedParams(DataFormat dataFormat, long serverTimeout, long clientTimeout, String route,
int maxChunkSizeBytes, final int maxInFlightRequests,
long localQueueTimeOut, String priority, boolean denyIfBusyV3, long maxSleepTimeMs,
@@ -319,4 +319,20 @@ public final class FeedParams {
return localQueueTimeOut;
}
+ /** Returns a builder initialized to the values of this */
+ public FeedParams.Builder toBuilder() {
+ Builder b = new Builder();
+ b.setDataFormat(dataFormat);
+ b.setServerTimeout(serverTimeoutMillis, TimeUnit.MILLISECONDS);
+ b.setClientTimeout(clientTimeoutMillis, TimeUnit.MILLISECONDS);
+ b.setRoute(route);
+ b.setMaxChunkSizeBytes(maxChunkSizeBytes);
+ b.setMaxInFlightRequests(maxInFlightRequests);
+ b.setPriority(priority);
+ b.setDenyIfBusyV3(denyIfBusyV3);
+ b.setMaxSleepTimeMs(maxSleepTimeMs);
+ b.setSilentUpgrade(silentUpgrade);
+ return b;
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
index 48fd21e2b1f..4e1406ab966 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/config/SessionParams.java
@@ -133,6 +133,8 @@ public final class SessionParams {
}
}
+ // NOTE! See toBuilder at the end of this class if you add fields here
+
private final List<Cluster> clusters;
private final FeedParams feedParams;
private final ConnectionParams connectionParams;
@@ -179,4 +181,15 @@ public final class SessionParams {
return errorReport;
}
+ public Builder toBuilder() {
+ Builder b = new Builder();
+ clusters.forEach(c -> b.addCluster(c));
+ b.setFeedParams(feedParams);
+ b.setConnectionParams(connectionParams);
+ b.setClientQueueSize(clientQueueSize);
+ b.setErrorReporter(errorReport);
+ b.setThrottlerMinSize(throttlerMinSize);
+ return b;
+ }
+
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 560bbd536e5..6e1f3419e8e 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -70,16 +70,15 @@ public class ClusterConnection implements AutoCloseable {
if (documentQueue == null) {
documentQueue = new DocumentQueue(clientQueueSizePerCluster);
}
- IOThread ioThread = new IOThread(
- operationProcessor.getIoThreadGroup(),
- endpointResultQueue,
- gatewayConnection,
- clusterId,
- feedParams.getMaxChunkSizeBytes(),
- maxInFlightPerSession,
- feedParams.getLocalQueueTimeOut(),
- documentQueue,
- feedParams.getMaxSleepTimeMs());
+ IOThread ioThread = new IOThread(operationProcessor.getIoThreadGroup(),
+ endpointResultQueue,
+ gatewayConnection,
+ clusterId,
+ feedParams.getMaxChunkSizeBytes(),
+ maxInFlightPerSession,
+ feedParams.getLocalQueueTimeOut(),
+ documentQueue,
+ feedParams.getMaxSleepTimeMs());
ioThreads.add(ioThread);
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index c72a313a4b7..8ec4f6cb7f4 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -53,7 +53,7 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
- private final AtomicInteger successfullHandshakes = new AtomicInteger(0);
+ private final AtomicInteger successfulHandshakes = new AtomicInteger(0);
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
IOThread(ThreadGroup ioThreadGroup,
@@ -131,7 +131,7 @@ class IOThread implements Runnable, AutoCloseable {
docsReceivedCounter.get(),
statusReceivedCounter.get(),
pendingDocumentStatusCount.get(),
- successfullHandshakes.get(),
+ successfulHandshakes.get(),
lastGatewayProcessTimeMillis.get());
}
@@ -175,7 +175,7 @@ class IOThread implements Runnable, AutoCloseable {
}
- List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) {
+ List<Document> getNextDocsForFeeding(long maxWaitUnits, TimeUnit timeUnit) {
List<Document> docsForSendChunk = new ArrayList<>();
int chunkSizeBytes = 0;
try {
@@ -269,14 +269,14 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException {
+ private ProcessResponse pullAndProcessData(long maxWaitTimeMs) throws ServerResponseException, IOException {
int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
- List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
+ List<Document> nextDocsForFeeding =
+ (pendingResultQueueSize > maxInFlightRequests)
? new ArrayList<>() // The queue is full, will not send more documents.
- : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS);
-
+ : getNextDocsForFeeding(maxWaitTimeMs, TimeUnit.MILLISECONDS);
if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
//we have no unfinished business with the server now.
@@ -285,6 +285,7 @@ class IOThread implements Runnable, AutoCloseable {
}
log.finest("Awaiting " + pendingResultQueueSize + " results.");
ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding);
+
if (pendingResultQueueSize > maxInFlightRequests && processResponse.processResultsCount == 0) {
try {
// Max outstanding document operations, no more results on server side, wait a bit
@@ -316,7 +317,7 @@ class IOThread implements Runnable, AutoCloseable {
case CONNECTED:
try {
client.handshake();
- successfullHandshakes.getAndIncrement();
+ successfulHandshakes.getAndIncrement();
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
log.info("Handshake did not work out " + endpoint + ": " + Exceptions.toMessageString(ser));
@@ -334,7 +335,7 @@ class IOThread implements Runnable, AutoCloseable {
return ThreadState.SESSION_SYNCED;
case SESSION_SYNCED:
try {
- ProcessResponse processResponse = pullAndProcessData(100);
+ ProcessResponse processResponse = pullAndProcessData(1);
gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
}
catch (ServerResponseException ser) {
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
index a2d5b18999e..388c71087ec 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/SyncFeedClientTest.java
@@ -3,6 +3,7 @@ package com.yahoo.vespa.http.client;
import com.yahoo.vespa.http.client.config.Cluster;
import com.yahoo.vespa.http.client.config.ConnectionParams;
import com.yahoo.vespa.http.client.config.Endpoint;
+import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.config.SessionParams;
import org.junit.Test;
@@ -36,7 +37,6 @@ public class SyncFeedClientTest {
.build();
SyncFeedClient feedClient = new SyncFeedClient(sessionParams);
-
assertFeedSuccessful(feedClient);
assertFeedSuccessful(feedClient); // ensure the client can be reused
feedClient.close();
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index 3143282081b..5a4c6d05185 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -162,7 +162,7 @@ public class IOThreadTest {
@Test
public void requireThatEndpointConnectExceptionsArePropagated()
- throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
+ throws IOException, ServerResponseException, InterruptedException, TimeoutException, ExecutionException {
when(apacheGatewayConnection.connect()).thenReturn(true);
String errorMessage = "generic error message";
IOException cause = new IOException(errorMessage);