summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorHaakon Dybdahl <dybdahl@yahoo-inc.com>2016-10-24 13:53:01 +0200
committerHaakon Dybdahl <dybdahl@yahoo-inc.com>2016-10-24 13:53:01 +0200
commitf4569d65f66d2f86048ad68311aa29a4bfddad8d (patch)
tree46a0ead34072982f08e31b9c4c1e7c83fe89d828 /vespa-http-client
parentd3128af7e81b9c92e07176520e85ecec4158a71c (diff)
Only sleep if there are nothing processed and max concurrent documents.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java61
1 files changed, 37 insertions, 24 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 60324eda47a..ce86d9ce043 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -172,14 +172,6 @@ class IOThread implements Runnable, AutoCloseable {
List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) {
final List<Document> docsForSendChunk = new ArrayList<>();
- if (resultQueue.getPendingSize() > maxInFlightRequests) {
-
- // The queue is full do some sleep just to reduce network usage.
- try {
- stopSignal.await(300, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) { /* Ignore */ }
- return docsForSendChunk;
- }
int chunkSizeBytes = 0;
try {
drainFirstDocumentInQueueIfOld();
@@ -238,8 +230,16 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- // Return number of transient errors.
- private int processResponse(InputStream serverResponse) throws IOException {
+ private static class ProcessResponse {
+ private final int transitiveErrorCount;
+ private final int processResultsCount;
+ ProcessResponse(int transitiveErrorCount, int processResultsCount) {
+ this.transitiveErrorCount = transitiveErrorCount;
+ this.processResultsCount = processResultsCount;
+ }
+ }
+
+ private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
final Collection<EndpointResult> endpointResults =
EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
@@ -251,36 +251,49 @@ class IOThread implements Runnable, AutoCloseable {
}
resultQueue.resultReceived(endpointResult, clusterId);
}
- return transientErrors;
+ return new ProcessResponse(transientErrors, endpointResults.size());
}
- // Returns number of transient errors.
- private int feedDocumentAndProcessResults(List<Document> docs)
+ private ProcessResponse feedDocumentAndProcessResults(List<Document> docs)
throws ServerResponseException, IOException {
addDocumentsToResultQueue(docs);
long startTime = System.currentTimeMillis();
InputStream serverResponse = sendAndReceive(docs);
- int transientErrors = processResponse(serverResponse);
+ ProcessResponse processResponse = processResponse(serverResponse);
lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - startTime));
- return transientErrors;
+ return processResponse;
}
- // Returns number of transient errors.
- private int pullAndProcessData(int maxWaitTimeMilliSecs)
+ private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs)
throws ServerResponseException, IOException {
- List<Document> nextDocsForFeeding = getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS);
-
final int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
+
+ List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
+ ? new ArrayList<>() // The queue is full, not more documents.
+ : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS);
+
+
if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) {
//we have no unfinished business with the server now.
log.finest("No document awaiting feeding, not waiting for results.");
- return 0;
+ return new ProcessResponse(0, 0);
}
log.finest("Awaiting " + pendingResultQueueSize + " results.");
- return feedDocumentAndProcessResults(nextDocsForFeeding);
-
+ ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding);
+ if (pendingResultQueueSize > maxInFlightRequests &&
+ processResponse.processResultsCount == 0 &&
+ nextDocsForFeeding.size() == 0) {
+ try {
+ // Max outstanding documents operation, nothing has been ready on server side, wait a bit
+ // before asking again.
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ return processResponse;
}
private ThreadState cycle(final ThreadState threadState) {
@@ -319,8 +332,8 @@ class IOThread implements Runnable, AutoCloseable {
case SESSION_SYNCED:
final int maxWaitTimeMilliSecs = 100;
try {
- int transientErrors = pullAndProcessData(maxWaitTimeMilliSecs);
- gatewayThrottler.handleCall(transientErrors);
+ ProcessResponse processResponse = pullAndProcessData(maxWaitTimeMilliSecs);
+ gatewayThrottler.handleCall(processResponse.transitiveErrorCount);
}
catch (ServerResponseException ser) {
log.severe("Problems while handing data over to gateway " + endpoint + " " + ser.getMessage());