summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@gmail.com>2020-04-15 16:54:14 +0200
committerJon Bratseth <bratseth@gmail.com>2020-04-15 16:54:14 +0200
commit748fcdf0901196fb6c7339ffdaf45bd47c56c78b (patch)
tree851a28bf7f5442bb5421abfe47b964310759a7c8 /vespa-http-client
parent874968628cb824e2a8dc99e1b9148e61c09a867e (diff)
Randomize chunk size
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java16
1 files changed, 14 insertions, 2 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index dfc5d0d0a66..a6460c62375 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -17,7 +17,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@@ -43,6 +45,7 @@ class IOThread implements Runnable, AutoCloseable {
private final int maxInFlightRequests;
private final long localQueueTimeOut;
private final GatewayThrottler gatewayThrottler;
+ private final Random random = new Random();
private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED };
private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
@@ -188,9 +191,13 @@ class IOThread implements Runnable, AutoCloseable {
return docsForSendChunk;
}
int pendingSize = 1 + resultQueue.getPendingSize();
- // see if we can get more documents without blocking
- while (chunkSizeBytes < maxChunkSizeBytes && pendingSize < maxInFlightRequests) {
+ // see if we can get more documents without blocking
+ // slightly randomize how much is taken to avoid harmonic interactions leading
+ // to some threads consistently taking more than others
+ int thisMaxChunkSizeBytes = randomize(maxChunkSizeBytes);
+ int thisMaxInFlightRequests = randomize(maxInFlightRequests);
+ while (chunkSizeBytes < thisMaxChunkSizeBytes && pendingSize < thisMaxInFlightRequests) {
drainFirstDocumentsInQueueIfOld();
Document document = documentQueue.poll();
if (document == null) break;
@@ -204,6 +211,11 @@ class IOThread implements Runnable, AutoCloseable {
return docsForSendChunk;
}
+ private int randomize(int limit) {
+ double multiplier = 0.75 + 0.25 * random.nextDouble();
+ return Math.max(1, (int)(limit * multiplier));
+ }
+
private void addDocumentsToResultQueue(List<Document> docs) {
for (Document doc : docs) {
resultQueue.operationSent(doc.getOperationId());