diff options
author | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 16:54:14 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@gmail.com> | 2020-04-15 16:54:14 +0200 |
commit | 748fcdf0901196fb6c7339ffdaf45bd47c56c78b (patch) | |
tree | 851a28bf7f5442bb5421abfe47b964310759a7c8 /vespa-http-client | |
parent | 874968628cb824e2a8dc99e1b9148e61c09a867e (diff) |
Randomize chunk size
Diffstat (limited to 'vespa-http-client')
-rw-r--r-- | vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java | 16 |
1 files changed, 14 insertions, 2 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index dfc5d0d0a66..a6460c62375 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -17,7 +17,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -43,6 +45,7 @@ class IOThread implements Runnable, AutoCloseable { private final int maxInFlightRequests; private final long localQueueTimeOut; private final GatewayThrottler gatewayThrottler; + private final Random random = new Random(); private enum ThreadState { DISCONNECTED, CONNECTED, SESSION_SYNCED }; private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0); @@ -188,9 +191,13 @@ class IOThread implements Runnable, AutoCloseable { return docsForSendChunk; } int pendingSize = 1 + resultQueue.getPendingSize(); - // see if we can get more documents without blocking - while (chunkSizeBytes < maxChunkSizeBytes && pendingSize < maxInFlightRequests) { + // see if we can get more documents without blocking + // slightly randomize how much is taken to avoid harmonic interactions leading + // to some threads consistently taking more than others + int thisMaxChunkSizeBytes = randomize(maxChunkSizeBytes); + int thisMaxInFlightRequests = randomize(maxInFlightRequests); + while (chunkSizeBytes < thisMaxChunkSizeBytes && pendingSize < thisMaxInFlightRequests) { drainFirstDocumentsInQueueIfOld(); Document document = documentQueue.poll(); if (document == null) break; @@ -204,6 +211,11 @@ class IOThread implements Runnable, AutoCloseable { return docsForSendChunk; } + private int randomize(int limit) { + double multiplier = 0.75 + 0.25 * random.nextDouble(); + return Math.max(1, (int)(limit * multiplier)); + } + private void addDocumentsToResultQueue(List<Document> docs) { for (Document doc : docs) { resultQueue.operationSent(doc.getOperationId()); |