diff options
Diffstat (limited to 'vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java')
-rw-r--r-- | vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java | 93 |
1 files changed, 43 insertions, 50 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 8c4ff3ae108..25b0ef20e81 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -24,7 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * Class for handling asynchronous feeding of new documents and processing of results. + * Thread which feeds document operations asynchronously and processes the results. * * @author Einar M R Rosenvinge */ @@ -56,16 +56,15 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger successfullHandshakes = new AtomicInteger(0); private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); - IOThread( - ThreadGroup ioThreadGroup, - EndpointResultQueue endpointResultQueue, - GatewayConnection client, - int clusterId, - int maxChunkSizeBytes, - int maxInFlightRequests, - long localQueueTimeOut, - DocumentQueue documentQueue, - long maxSleepTimeMs) { + IOThread(ThreadGroup ioThreadGroup, + EndpointResultQueue endpointResultQueue, + GatewayConnection client, + int clusterId, + int maxChunkSizeBytes, + int maxInFlightRequests, + long localQueueTimeOut, + DocumentQueue documentQueue, + long maxSleepTimeMs) { this.documentQueue = documentQueue; this.endpoint = client.getEndpoint(); this.client = client; @@ -85,27 +84,26 @@ class IOThread implements Runnable, AutoCloseable { return endpoint; } - public static class ConnectionStats { - public final int wrongSessionDetectedCounter; - public final int wrongVersionDetectedCounter; - public final int problemStatusCodeFromServerCounter; - public final int executeProblemsCounter; - public final int docsReceivedCounter; - public final int statusReceivedCounter; - public final int pendingDocumentStatusCount; - public final int successfullHandshakes; - public final int lastGatewayProcessTimeMillis; - - protected ConnectionStats( - final int wrongSessionDetectedCounter, - final int wrongVersionDetectedCounter, - final int problemStatusCodeFromServerCounter, - final int executeProblemsCounter, - final int docsReceivedCounter, - final int statusReceivedCounter, - final int pendingDocumentStatusCount, - final int successfullHandshakes, - final int lastGatewayProcessTimeMillis) { + static class ConnectionStats { + final int wrongSessionDetectedCounter; + final int wrongVersionDetectedCounter; + final int problemStatusCodeFromServerCounter; + final int executeProblemsCounter; + final int docsReceivedCounter; + final int statusReceivedCounter; + final int pendingDocumentStatusCount; + final int successfullHandshakes; + final int lastGatewayProcessTimeMillis; + + ConnectionStats(int wrongSessionDetectedCounter, + int wrongVersionDetectedCounter, + int problemStatusCodeFromServerCounter, + int executeProblemsCounter, + int docsReceivedCounter, + int statusReceivedCounter, + int pendingDocumentStatusCount, + int successfullHandshakes, + int lastGatewayProcessTimeMillis) { this.wrongSessionDetectedCounter = wrongSessionDetectedCounter; this.wrongVersionDetectedCounter = wrongVersionDetectedCounter; this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter; @@ -137,9 +135,7 @@ class IOThread implements Runnable, AutoCloseable { @Override public void close() { documentQueue.close(); - if (stopSignal.getCount() == 0) { - return; - } + if (stopSignal.getCount() == 0) return; stopSignal.countDown(); log.finer("Closed called."); @@ -166,8 +162,7 @@ class IOThread implements Runnable, AutoCloseable { log.fine("Session to " + endpoint + " closed."); } - - public void post(final Document document) throws InterruptedException { + public void post(Document document) throws InterruptedException { documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); } @@ -178,7 +173,7 @@ class IOThread implements Runnable, AutoCloseable { List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) { - final List<Document> docsForSendChunk = new ArrayList<>(); + List<Document> docsForSendChunk = new ArrayList<>(); int chunkSizeBytes = 0; try { drainFirstDocumentsInQueueIfOld(); @@ -214,8 +209,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private void markDocumentAsFailed( - List<Document> docs, ServerResponseException servletException) { + private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) { for (Document doc : docs) { resultQueue.failOperation( EndPointResultFactory.createTransientError( @@ -223,8 +217,7 @@ class IOThread implements Runnable, AutoCloseable { } } - private InputStream sendAndReceive(List<Document> docs) - throws IOException, ServerResponseException { + private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException { try { // Post the new docs and get async responses for other posts. return client.writeOperations(docs); @@ -238,17 +231,19 @@ class IOThread implements Runnable, AutoCloseable { } private static class ProcessResponse { + private final int transitiveErrorCount; private final int processResultsCount; + ProcessResponse(int transitiveErrorCount, int processResultsCount) { this.transitiveErrorCount = transitiveErrorCount; this.processResultsCount = processResultsCount; } + } private ProcessResponse processResponse(InputStream serverResponse) throws IOException { - final Collection<EndpointResult> endpointResults = - EndPointResultFactory.createResult(endpoint, serverResponse); + Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse); statusReceivedCounter.addAndGet(endpointResults.size()); int transientErrors = 0; for (EndpointResult endpointResult : endpointResults) { @@ -271,9 +266,8 @@ class IOThread implements Runnable, AutoCloseable { return processResponse; } - private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) - throws ServerResponseException, IOException { - final int pendingResultQueueSize = resultQueue.getPendingSize(); + private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException { + int pendingResultQueueSize = resultQueue.getPendingSize(); pendingDocumentStatusCount.set(pendingResultQueueSize); List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests) @@ -387,9 +381,8 @@ class IOThread implements Runnable, AutoCloseable { private void drainFirstDocumentsInQueueIfOld() { while (true) { Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut); - if (! document.isPresent()) { - return; - } + if ( ! document.isPresent()) return; + EndpointResult endpointResult = EndPointResultFactory.createTransientError( endpoint, document.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " |