diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-18 16:15:22 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-18 16:15:22 +0200 |
commit | 355e3d35b7213a03fdceff2712863dd13ab16290 (patch) | |
tree | 27cbed03969d59e32c2fc9bf8bbe152626f39b8b /vespa-http-client | |
parent | cd4f5b189210ed4ec24606f935ef9eae688b26fd (diff) |
Use a thread group to avoid any iothreads bumping into a throttled Q.
Diffstat (limited to 'vespa-http-client')
5 files changed, 20 insertions, 18 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java index 67011a24b45..d124475e3a5 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java @@ -87,6 +87,7 @@ public class ClusterConnection implements AutoCloseable { documentQueue = new DocumentQueue(clientQueueSizePerCluster / cluster.getEndpoints().size()); } final IOThread ioThread = new IOThread( + operationProcessor.getIoThreadGroup(), endpointResultQueue, gatewayConnection, clusterId, diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java index 2b746cf56d0..16bf881963f 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java @@ -38,10 +38,10 @@ class DocumentQueue { } } - void put(Document document, boolean self) throws InterruptedException { + void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException { document.resetQueueTime(); synchronized (queue) { - while (!closed && (queue.size() >= maxSize) && !self) { + while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) { queue.wait(); } if (closed) { diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java index 328260e6761..618c187be33 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java @@ -36,6 +36,7 @@ class IOThread implements Runnable, AutoCloseable { private final DocumentQueue documentQueue; private final EndpointResultQueue resultQueue; private final Thread thread; + private final ThreadGroup ioThreadGroup; private final int clusterId; private final CountDownLatch running = new CountDownLatch(1); private final CountDownLatch stopSignal = new CountDownLatch(1); @@ -56,6 +57,7 @@ class IOThread implements Runnable, AutoCloseable { private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0); IOThread( + ThreadGroup ioThreadGroup, EndpointResultQueue endpointResultQueue, GatewayConnection client, int clusterId, @@ -72,7 +74,8 @@ class IOThread implements Runnable, AutoCloseable { this.maxChunkSizeBytes = maxChunkSizeBytes; this.maxInFlightRequests = maxInFlightRequests; this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs); - thread = new Thread(this, "IOThread " + endpoint); + this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint); + this.ioThreadGroup = ioThreadGroup; thread.setDaemon(true); this.localQueueTimeOut = localQueueTimeOut; thread.start(); @@ -165,7 +168,7 @@ class IOThread implements Runnable, AutoCloseable { public void post(final Document document) throws InterruptedException { - documentQueue.put(document, Thread.currentThread() == thread); + documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup); } @Override diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 6d39584769c..488fe41d7ff 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -16,9 +16,7 @@ import com.yahoo.vespa.http.client.core.communication.ClusterConnection; import java.math.BigInteger; import java.security.SecureRandom; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -56,6 +54,7 @@ public class OperationProcessor { private final boolean blockOperationsToSameDocument; private int traceCounter = 0; private final boolean traceToStderr; + private final ThreadGroup ioThreadGroup; private final String clientId = new BigInteger(130, random).toString(32); public OperationProcessor( @@ -68,6 +67,7 @@ public class OperationProcessor { this.incompleteResultsThrottler = incompleteResultsThrottler; this.timeoutExecutor = timeoutExecutor; this.blockOperationsToSameDocument = sessionParams.getConnectionParams().isEnableV3Protocol(); + this.ioThreadGroup = new ThreadGroup("operationprocessor"); if (sessionParams.getClusters().isEmpty()) { throw new IllegalArgumentException("Cannot feed to 0 clusters."); @@ -100,6 +100,10 @@ public class OperationProcessor { traceToStderr = sessionParams.getConnectionParams().getPrintTraceToStdErr(); } + public ThreadGroup getIoThreadGroup() { + return ioThreadGroup; + } + public int getIncompleteResultQueueSize() { synchronized (monitor) { return docSendInfoByOperationId.size(); diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java index f2a63895fd4..3143282081b 100644 --- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java +++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java @@ -89,8 +89,7 @@ public class IOThreadTest { (docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8)); when(apacheGatewayConnection.writeOperations(anyObject())).thenReturn(serverResponse); setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -101,8 +100,7 @@ public class IOThreadTest { when(apacheGatewayConnection.connect()).thenReturn(true); when(apacheGatewayConnection.writeOperations(anyObject())).thenThrow(new IOException(exceptionMessage)); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -119,8 +117,7 @@ public class IOThreadTest { latch = new CountDownLatch(2); setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) { ioThread.post(doc1); ioThread.post(doc2); assert (latch.await(120, TimeUnit.SECONDS)); @@ -136,8 +133,7 @@ public class IOThreadTest { .thenReturn(serverResponse); setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, "java.lang.Exception: Not sending document operation, timed out in queue after"); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { ioThread.post(doc1); assert (latch.await(120, TimeUnit.SECONDS)); } @@ -152,8 +148,7 @@ public class IOThreadTest { doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake(); Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { ioThread.post(doc1); FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); assertThat(reportedException, instanceOf(FeedProtocolException.class)); @@ -174,8 +169,7 @@ public class IOThreadTest { doThrow(cause).when(apacheGatewayConnection).handshake(); Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue); - try (IOThread ioThread = new IOThread( - endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { + try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) { ioThread.post(doc1); FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS); assertThat(reportedException, instanceOf(FeedConnectException.class)); |