summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorHaakon Dybdahl <dybdahl@yahoo-inc.com>2017-03-15 10:47:19 +0100
committerHaakon Dybdahl <dybdahl@yahoo-inc.com>2017-03-15 10:47:19 +0100
commit6c135a1a15306e13c3797aea5352290ec9a6fec0 (patch)
tree6ee6cb8b730f6ea0861789deed01f979a9cb0c6e /vespa-http-client
parenta563306957b180c933e5e2ff68a989b32f02aae7 (diff)
Fix race when two threads are competing for head element in document queue.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java14
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java35
2 files changed, 27 insertions, 22 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
index cefa8d6e94a..edbd367d186 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
@@ -2,11 +2,14 @@
package com.yahoo.vespa.http.client.core.communication;
import com.yahoo.vespa.http.client.core.Document;
+import com.yahoo.vespa.http.client.core.EndpointResult;
+import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
@@ -104,9 +107,16 @@ class DocumentQueue {
return previousState;
}
- Document peek() {
+ Optional<Document> pollDocumentIfTimedoutInQueue(long localQueueTimeOut) {
synchronized (queue) {
- return queue.peek();
+ if (queue.isEmpty()) {
+ return Optional.empty();
+ }
+ Document document = queue.peek();
+ if (document.timeInQueueMillis() > localQueueTimeOut) {
+ return Optional.of(queue.poll());
+ }
+ return Optional.empty();
}
}
}
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index a6e0c9092db..b8564817d84 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -14,6 +14,7 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -175,7 +176,7 @@ class IOThread implements Runnable, AutoCloseable {
final List<Document> docsForSendChunk = new ArrayList<>();
int chunkSizeBytes = 0;
try {
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
Document doc = documentQueue.poll(maxWaitUnits, timeUnit);
if (doc != null) {
docsForSendChunk.add(doc);
@@ -188,7 +189,7 @@ class IOThread implements Runnable, AutoCloseable {
int pendingSize = 1 + resultQueue.getPendingSize();
// see if we can get more documents without blocking
while (chunkSizeBytes < maxChunkSizeBytes && pendingSize < maxInFlightRequests) {
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
Document d = documentQueue.poll();
if (d == null) {
break;
@@ -300,12 +301,12 @@ class IOThread implements Runnable, AutoCloseable {
try {
if (! client.connect()) {
log.log(Level.WARNING, "Connect returned null " + endpoint);
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
return ThreadState.DISCONNECTED;
}
return ThreadState.CONNECTED;
} catch (Throwable throwable1) {
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
log.log(Level.INFO, "Connect did not work out " + endpoint, throwable1);
executeProblemsCounter.incrementAndGet();
return ThreadState.DISCONNECTED;
@@ -317,12 +318,12 @@ class IOThread implements Runnable, AutoCloseable {
} catch (ServerResponseException ser) {
executeProblemsCounter.incrementAndGet();
log.log(Level.INFO, "Handshake did not work out " + endpoint, ser.getMessage());
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
return ThreadState.CONNECTED;
} catch (Throwable throwable) { // This cover IOException as well
executeProblemsCounter.incrementAndGet();
log.log(Level.INFO, "Problem with Handshake " + endpoint, throwable.getMessage());
- drainFirstDocumentInQueueIfOld();
+ drainFirstDocumentsInQueueIfOld();
client.close();
return ThreadState.DISCONNECTED;
}
@@ -378,23 +379,17 @@ class IOThread implements Runnable, AutoCloseable {
}
-
- private void drainFirstDocumentInQueueIfOld() {
+ private void drainFirstDocumentsInQueueIfOld() {
while (true) {
- Document document = documentQueue.peek();
- if (document == null) {
- return;
- }
- if (document.timeInQueueMillis() > localQueueTimeOut) {
- documentQueue.poll();
- EndpointResult endpointResult = EndPointResultFactory.createTransientError(
- endpoint, document.getOperationId(),
- new Exception("Not sending document operation, timed out in queue after "
- + document.timeInQueueMillis() + " ms."));
- resultQueue.failOperation(endpointResult, clusterId);
- } else {
+ Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut);
+ if (! document.isPresent()) {
return;
}
+ EndpointResult endpointResult = EndPointResultFactory.createTransientError(
+ endpoint, document.get().getOperationId(),
+ new Exception("Not sending document operation, timed out in queue after "
+ + document.get().timeInQueueMillis() + " ms."));
+ resultQueue.failOperation(endpointResult, clusterId);
}
}