diff options
author | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2017-03-15 10:47:19 +0100 |
---|---|---|
committer | Haakon Dybdahl <dybdahl@yahoo-inc.com> | 2017-03-15 10:47:19 +0100 |
commit | 6c135a1a15306e13c3797aea5352290ec9a6fec0 (patch) | |
tree | 6ee6cb8b730f6ea0861789deed01f979a9cb0c6e /vespa-http-client | |
parent | a563306957b180c933e5e2ff68a989b32f02aae7 (diff) |
Fix race when two threads are competing for head element in document queue.
Diffstat (limited to 'vespa-http-client')
2 files changed, 27 insertions, 22 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java index cefa8d6e94a..edbd367d186 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java @@ -2,11 +2,14 @@ package com.yahoo.vespa.http.client.core.communication; import com.yahoo.vespa.http.client.core.Document; +import com.yahoo.vespa.http.client.core.EndpointResult; +import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** @@ -104,9 +107,16 @@ class DocumentQueue { return previousState; } - Document peek() { + Optional<Document> pollDocumentIfTimedoutInQueue(long localQueueTimeOut) { synchronized (queue) { - return queue.peek(); + if (queue.isEmpty()) { + return Optional.empty(); + } + Document document = queue.peek(); + if (document.timeInQueueMillis() > localQueueTimeOut) { + return Optional.of(queue.poll()); + } + return Optional.empty(); } } } diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index a6e0c9092db..b8564817d84 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -14,6 +14,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -175,7 +176,7 @@ class IOThread implements Runnable, AutoCloseable { final List<Document> docsForSendChunk = new ArrayList<>(); int chunkSizeBytes = 0; try { - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); Document doc = documentQueue.poll(maxWaitUnits, timeUnit); if (doc != null) { docsForSendChunk.add(doc); @@ -188,7 +189,7 @@ class IOThread implements Runnable, AutoCloseable { int pendingSize = 1 + resultQueue.getPendingSize(); // see if we can get more documents without blocking while (chunkSizeBytes < maxChunkSizeBytes && pendingSize < maxInFlightRequests) { - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); Document d = documentQueue.poll(); if (d == null) { break; @@ -300,12 +301,12 @@ class IOThread implements Runnable, AutoCloseable { try { if (! client.connect()) { log.log(Level.WARNING, "Connect returned null " + endpoint); - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); return ThreadState.DISCONNECTED; } return ThreadState.CONNECTED; } catch (Throwable throwable1) { - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); log.log(Level.INFO, "Connect did not work out " + endpoint, throwable1); executeProblemsCounter.incrementAndGet(); return ThreadState.DISCONNECTED; @@ -317,12 +318,12 @@ class IOThread implements Runnable, AutoCloseable { } catch (ServerResponseException ser) { executeProblemsCounter.incrementAndGet(); log.log(Level.INFO, "Handshake did not work out " + endpoint, ser.getMessage()); - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); return ThreadState.CONNECTED; } catch (Throwable throwable) { // This cover IOException as well executeProblemsCounter.incrementAndGet(); log.log(Level.INFO, "Problem with Handshake " + endpoint, throwable.getMessage()); - drainFirstDocumentInQueueIfOld(); + drainFirstDocumentsInQueueIfOld(); client.close(); return ThreadState.DISCONNECTED; } @@ -378,23 +379,17 @@ class IOThread implements Runnable, AutoCloseable { } - - private void drainFirstDocumentInQueueIfOld() { + private void drainFirstDocumentsInQueueIfOld() { while (true) { - Document document = documentQueue.peek(); - if (document == null) { - return; - } - if (document.timeInQueueMillis() > localQueueTimeOut) { - documentQueue.poll(); - EndpointResult endpointResult = EndPointResultFactory.createTransientError( - endpoint, document.getOperationId(), - new Exception("Not sending document operation, timed out in queue after " - + document.timeInQueueMillis() + " ms.")); - resultQueue.failOperation(endpointResult, clusterId); - } else { + Optional<Document> document = documentQueue.pollDocumentIfTimedoutInQueue(localQueueTimeOut); + if (! document.isPresent()) { return; } + EndpointResult endpointResult = EndPointResultFactory.createTransientError( + endpoint, document.get().getOperationId(), + new Exception("Not sending document operation, timed out in queue after " + + document.get().timeInQueueMillis() + " ms.")); + resultQueue.failOperation(endpointResult, clusterId); } } |