diff options
author | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2016-10-24 13:53:01 +0200 |
---|---|---|
committer | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2016-10-24 13:53:01 +0200 |
commit | f4569d65f66d2f86048ad68311aa29a4bfddad8d (patch) | |
tree | 46a0ead34072982f08e31b9c4c1e7c83fe89d828 /vespa-http-client/src | |
parent | d3128af7e81b9c92e07176520e85ecec4158a71c (diff) |
Only sleep if there are nothing processed and max concurrent documents.
Diffstat (limited to 'vespa-http-client/src')
-rw-r--r-- | vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 60324eda47a..ce86d9ce043 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -172,14 +172,6 @@ class IOThread implements Runnable, AutoCloseable { List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) { final List<Document> docsForSendChunk = new ArrayList<>(); - if (resultQueue.getPendingSize() > maxInFlightRequests) { - - // The queue is full do some sleep just to reduce network usage. - try { - stopSignal.await(300, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { /* Ignore */ } - return docsForSendChunk; - } int chunkSizeBytes = 0; try { drainFirstDocumentInQueueIfOld(); @@ -238,8 +230,16 @@ class IOThread implements Runnable, AutoCloseable { } } - // Return number of transient errors. - private int processResponse(InputStream serverResponse) throws IOException { + private static class ProcessResponse { + private final int transitiveErrorCount; + private final int processResultsCount; + ProcessResponse(int transitiveErrorCount, int processResultsCount) { + this.transitiveErrorCount = transitiveErrorCount; + this.processResultsCount = processResultsCount; + } + } + + private ProcessResponse processResponse(InputStream serverResponse) throws IOException { final Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); @@ -251,36 +251,49 @@ class IOThread implements Runnable, AutoCloseable { } resultQueue.resultReceived(endpointResult, clusterId); } - return transientErrors; + return new ProcessResponse(transientErrors, endpointResults.size()); } - // Returns number of transient errors. - private int feedDocumentAndProcessResults(List<Document> docs) + private ProcessResponse feedDocumentAndProcessResults(List<Document> docs) throws ServerResponseException, IOException { addDocumentsToResultQueue(docs); long startTime = System.currentTimeMillis(); InputStream serverResponse = sendAndReceive(docs); - int transientErrors = processResponse(serverResponse); + ProcessResponse processResponse = processResponse(serverResponse); lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - startTime)); - return transientErrors; + return processResponse; } - // Returns number of transient errors. - private int pullAndProcessData(int maxWaitTimeMilliSecs) + private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException { - List<Document> nextDocsForFeeding = getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS); - final int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); + + List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) + ? new ArrayList<>() // The queue is full, not more documents. + : getNextDocsForFeeding(maxWaitTimeMilliSecs, TimeUnit.MILLISECONDS); + + if (nextDocsForFeeding.isEmpty() && pendingResultQueueSize == 0) { //we have no unfinished business with the server now. log.finest("No document awaiting feeding, not waiting for results."); - return 0; + return new ProcessResponse(0, 0); } log.finest("Awaiting " + pendingResultQueueSize + " results."); - return feedDocumentAndProcessResults(nextDocsForFeeding); - + ProcessResponse processResponse = feedDocumentAndProcessResults(nextDocsForFeeding); + if (pendingResultQueueSize > maxInFlightRequests && + processResponse.processResultsCount == 0 && + nextDocsForFeeding.size() == 0) { + try { + // Max outstanding documents operation, nothing has been ready on server side, wait a bit + // before asking again. + Thread.sleep(300); + } catch (InterruptedException e) { + // Ignore + } + } + return processResponse; } private ThreadState cycle(final ThreadState threadState) { @@ -319,8 +332,8 @@ class IOThread implements Runnable, AutoCloseable { case SESSION_SYNCED: final int maxWaitTimeMilliSecs = 100; try { - int transientErrors = pullAndProcessData(maxWaitTimeMilliSecs); - gatewayThrottler.handleCall(transientErrors); + ProcessResponse processResponse = pullAndProcessData(maxWaitTimeMilliSecs); + gatewayThrottler.handleCall(processResponse.transitiveErrorCount); } catch (ServerResponseException ser) { log.severe("Problems while handing data over to gateway " + endpoint + " " + ser.getMessage()); |