summaryrefslogtreecommitdiffstats
path: root/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java93
1 files changed, 43 insertions, 50 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 8c4ff3ae108..25b0ef20e81 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -24,7 +24,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * Class for handling asynchronous feeding of new documents and processing of results.
+ * Thread which feeds document operations asynchronously and processes the results.
*
* @author Einar M R Rosenvinge
*/
@@ -56,16 +56,15 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger successfullHandshakes = new AtomicInteger(0);
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
- IOThread(
- ThreadGroup ioThreadGroup,
- EndpointResultQueue endpointResultQueue,
- GatewayConnection client,
- int clusterId,
- int maxChunkSizeBytes,
- int maxInFlightRequests,
- long localQueueTimeOut,
- DocumentQueue documentQueue,
- long maxSleepTimeMs) {
+ IOThread(ThreadGroup ioThreadGroup,
+ EndpointResultQueue endpointResultQueue,
+ GatewayConnection client,
+ int clusterId,
+ int maxChunkSizeBytes,
+ int maxInFlightRequests,
+ long localQueueTimeOut,
+ DocumentQueue documentQueue,
+ long maxSleepTimeMs) {
this.documentQueue = documentQueue;
this.endpoint = client.getEndpoint();
this.client = client;
@@ -85,27 +84,26 @@ class IOThread implements Runnable, AutoCloseable {
return endpoint;
}
- public static class ConnectionStats {
- public final int wrongSessionDetectedCounter;
- public final int wrongVersionDetectedCounter;
- public final int problemStatusCodeFromServerCounter;
- public final int executeProblemsCounter;
- public final int docsReceivedCounter;
- public final int statusReceivedCounter;
- public final int pendingDocumentStatusCount;
- public final int successfullHandshakes;
- public final int lastGatewayProcessTimeMillis;
-
- protected ConnectionStats(
- final int wrongSessionDetectedCounter,
- final int wrongVersionDetectedCounter,
- final int problemStatusCodeFromServerCounter,
- final int executeProblemsCounter,
- final int docsReceivedCounter,
- final int statusReceivedCounter,
- final int pendingDocumentStatusCount,
- final int successfullHandshakes,
- final int lastGatewayProcessTimeMillis) {
+ static class ConnectionStats {
+ final int wrongSessionDetectedCounter;
+ final int wrongVersionDetectedCounter;
+ final int problemStatusCodeFromServerCounter;
+ final int executeProblemsCounter;
+ final int docsReceivedCounter;
+ final int statusReceivedCounter;
+ final int pendingDocumentStatusCount;
+ final int successfullHandshakes;
+ final int lastGatewayProcessTimeMillis;
+
+ ConnectionStats(int wrongSessionDetectedCounter,
+ int wrongVersionDetectedCounter,
+ int problemStatusCodeFromServerCounter,
+ int executeProblemsCounter,
+ int docsReceivedCounter,
+ int statusReceivedCounter,
+ int pendingDocumentStatusCount,
+ int successfullHandshakes,
+ int lastGatewayProcessTimeMillis) {
this.wrongSessionDetectedCounter = wrongSessionDetectedCounter;
this.wrongVersionDetectedCounter = wrongVersionDetectedCounter;
this.problemStatusCodeFromServerCounter = problemStatusCodeFromServerCounter;
@@ -137,9 +135,7 @@ class IOThread implements Runnable, AutoCloseable {
@Override
public void close() {
documentQueue.close();
- if (stopSignal.getCount() == 0) {
- return;
- }
+ if (stopSignal.getCount() == 0) return;
stopSignal.countDown();
log.finer("Closed called.");
@@ -166,8 +162,7 @@ class IOThread implements Runnable, AutoCloseable {
log.fine("Session to " + endpoint + " closed.");
}
-
- public void post(final Document document) throws InterruptedException {
+ public void post(Document document) throws InterruptedException {
documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
}
@@ -178,7 +173,7 @@ class IOThread implements Runnable, AutoCloseable {
List<Document> getNextDocsForFeeding(int maxWaitUnits, TimeUnit timeUnit) {
- final List<Document> docsForSendChunk = new ArrayList<>();
+ List<Document> docsForSendChunk = new ArrayList<>();
int chunkSizeBytes = 0;
try {
drainFirstDocumentsInQueueIfOld();
@@ -214,8 +209,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private void markDocumentAsFailed(
- List<Document> docs, ServerResponseException servletException) {
+ private void markDocumentAsFailed(List<Document> docs, ServerResponseException servletException) {
for (Document doc : docs) {
resultQueue.failOperation(
EndPointResultFactory.createTransientError(
@@ -223,8 +217,7 @@ class IOThread implements Runnable, AutoCloseable {
}
}
- private InputStream sendAndReceive(List<Document> docs)
- throws IOException, ServerResponseException {
+ private InputStream sendAndReceive(List<Document> docs) throws IOException, ServerResponseException {
try {
// Post the new docs and get async responses for other posts.
return client.writeOperations(docs);
@@ -238,17 +231,19 @@ class IOThread implements Runnable, AutoCloseable {
}
private static class ProcessResponse {
+
private final int transitiveErrorCount;
private final int processResultsCount;
+
ProcessResponse(int transitiveErrorCount, int processResultsCount) {
this.transitiveErrorCount = transitiveErrorCount;
this.processResultsCount = processResultsCount;
}
+
}
private ProcessResponse processResponse(InputStream serverResponse) throws IOException {
- final Collection<EndpointResult> endpointResults =
- EndPointResultFactory.createResult(endpoint, serverResponse);
+ Collection<EndpointResult> endpointResults = EndPointResultFactory.createResult(endpoint, serverResponse);
statusReceivedCounter.addAndGet(endpointResults.size());
int transientErrors = 0;
for (EndpointResult endpointResult : endpointResults) {
@@ -271,9 +266,8 @@ class IOThread implements Runnable, AutoCloseable {
return processResponse;
}
- private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs)
- throws ServerResponseException, IOException {
- final int pendingResultQueueSize = resultQueue.getPendingSize();
+ private ProcessResponse pullAndProcessData(int maxWaitTimeMilliSecs) throws ServerResponseException, IOException {
+ int pendingResultQueueSize = resultQueue.getPendingSize();
pendingDocumentStatusCount.set(pendingResultQueueSize);
List<Document> nextDocsForFeeding = (pendingResultQueueSize > maxInFlightRequests)
@@ -387,9 +381,8 @@ class IOThread implements Runnable, AutoCloseable {
private void drainFirstDocumentsInQueueIfOld() {
while (true) {
Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut);
- if (! document.isPresent()) {
- return;
- }
+ if ( ! document.isPresent()) return;
+
EndpointResult endpointResult = EndPointResultFactory.createTransientError(
endpoint, document.get().getOperationId(),
new Exception("Not sending document operation, timed out in queue after "