summaryrefslogtreecommitdiffstats
path: root/vespa-http-client
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-18 16:15:22 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-10-18 16:15:22 +0200
commit355e3d35b7213a03fdceff2712863dd13ab16290 (patch)
tree27cbed03969d59e32c2fc9bf8bbe152626f39b8b /vespa-http-client
parentcd4f5b189210ed4ec24606f935ef9eae688b26fd (diff)
Use a thread group to avoid any iothreads bumping into a throttled Q.
Diffstat (limited to 'vespa-http-client')
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java4
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java7
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java8
-rw-r--r--vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java18
5 files changed, 20 insertions, 18 deletions
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
index 67011a24b45..d124475e3a5 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ClusterConnection.java
@@ -87,6 +87,7 @@ public class ClusterConnection implements AutoCloseable {
documentQueue = new DocumentQueue(clientQueueSizePerCluster / cluster.getEndpoints().size());
}
final IOThread ioThread = new IOThread(
+ operationProcessor.getIoThreadGroup(),
endpointResultQueue,
gatewayConnection,
clusterId,
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
index 2b746cf56d0..16bf881963f 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/DocumentQueue.java
@@ -38,10 +38,10 @@ class DocumentQueue {
}
}
- void put(Document document, boolean self) throws InterruptedException {
+ void put(Document document, boolean calledFromIoThreadGroup) throws InterruptedException {
document.resetQueueTime();
synchronized (queue) {
- while (!closed && (queue.size() >= maxSize) && !self) {
+ while (!closed && (queue.size() >= maxSize) && !calledFromIoThreadGroup) {
queue.wait();
}
if (closed) {
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
index 328260e6761..618c187be33 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/IOThread.java
@@ -36,6 +36,7 @@ class IOThread implements Runnable, AutoCloseable {
private final DocumentQueue documentQueue;
private final EndpointResultQueue resultQueue;
private final Thread thread;
+ private final ThreadGroup ioThreadGroup;
private final int clusterId;
private final CountDownLatch running = new CountDownLatch(1);
private final CountDownLatch stopSignal = new CountDownLatch(1);
@@ -56,6 +57,7 @@ class IOThread implements Runnable, AutoCloseable {
private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);
IOThread(
+ ThreadGroup ioThreadGroup,
EndpointResultQueue endpointResultQueue,
GatewayConnection client,
int clusterId,
@@ -72,7 +74,8 @@ class IOThread implements Runnable, AutoCloseable {
this.maxChunkSizeBytes = maxChunkSizeBytes;
this.maxInFlightRequests = maxInFlightRequests;
this.gatewayThrottler = new GatewayThrottler(maxSleepTimeMs);
- thread = new Thread(this, "IOThread " + endpoint);
+ this.thread = new Thread(ioThreadGroup, this, "IOThread " + endpoint);
+ this.ioThreadGroup = ioThreadGroup;
thread.setDaemon(true);
this.localQueueTimeOut = localQueueTimeOut;
thread.start();
@@ -165,7 +168,7 @@ class IOThread implements Runnable, AutoCloseable {
public void post(final Document document) throws InterruptedException {
- documentQueue.put(document, Thread.currentThread() == thread);
+ documentQueue.put(document, Thread.currentThread().getThreadGroup() == ioThreadGroup);
}
@Override
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 6d39584769c..488fe41d7ff 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -16,9 +16,7 @@ import com.yahoo.vespa.http.client.core.communication.ClusterConnection;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -56,6 +54,7 @@ public class OperationProcessor {
private final boolean blockOperationsToSameDocument;
private int traceCounter = 0;
private final boolean traceToStderr;
+ private final ThreadGroup ioThreadGroup;
private final String clientId = new BigInteger(130, random).toString(32);
public OperationProcessor(
@@ -68,6 +67,7 @@ public class OperationProcessor {
this.incompleteResultsThrottler = incompleteResultsThrottler;
this.timeoutExecutor = timeoutExecutor;
this.blockOperationsToSameDocument = sessionParams.getConnectionParams().isEnableV3Protocol();
+ this.ioThreadGroup = new ThreadGroup("operationprocessor");
if (sessionParams.getClusters().isEmpty()) {
throw new IllegalArgumentException("Cannot feed to 0 clusters.");
@@ -100,6 +100,10 @@ public class OperationProcessor {
traceToStderr = sessionParams.getConnectionParams().getPrintTraceToStdErr();
}
+ public ThreadGroup getIoThreadGroup() {
+ return ioThreadGroup;
+ }
+
public int getIncompleteResultQueueSize() {
synchronized (monitor) {
return docSendInfoByOperationId.size();
diff --git a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
index f2a63895fd4..3143282081b 100644
--- a/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
+++ b/vespa-http-client/src/test/java/com/yahoo/vespa/http/client/core/communication/IOThreadTest.java
@@ -89,8 +89,7 @@ public class IOThreadTest {
(docId1 + " OK Doc{20}fed").getBytes(StandardCharsets.UTF_8));
when(apacheGatewayConnection.writeOperations(anyObject())).thenReturn(serverResponse);
setupEndpointResultQueueMock( "nope", docId1, true, exceptionMessage);
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -101,8 +100,7 @@ public class IOThreadTest {
when(apacheGatewayConnection.connect()).thenReturn(true);
when(apacheGatewayConnection.writeOperations(anyObject())).thenThrow(new IOException(exceptionMessage));
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true, exceptionMessage);
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -119,8 +117,7 @@ public class IOThreadTest {
latch = new CountDownLatch(2);
setupEndpointResultQueueMock(doc1.getOperationId(), doc2.getDocumentId(), true, exceptionMessage);
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10000, 10000L, documentQueue, 0)) {
ioThread.post(doc1);
ioThread.post(doc2);
assert (latch.await(120, TimeUnit.SECONDS));
@@ -136,8 +133,7 @@ public class IOThreadTest {
.thenReturn(serverResponse);
setupEndpointResultQueueMock(doc1.getOperationId(), "nope", true,
"java.lang.Exception: Not sending document operation, timed out in queue after");
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
ioThread.post(doc1);
assert (latch.await(120, TimeUnit.SECONDS));
}
@@ -152,8 +148,7 @@ public class IOThreadTest {
doThrow(new ServerResponseException(errorCode, errorMessage)).when(apacheGatewayConnection).handshake();
Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
ioThread.post(doc1);
FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
assertThat(reportedException, instanceOf(FeedProtocolException.class));
@@ -174,8 +169,7 @@ public class IOThreadTest {
doThrow(cause).when(apacheGatewayConnection).handshake();
Future<FeedEndpointException> futureException = endpointErrorCapturer(endpointResultQueue);
- try (IOThread ioThread = new IOThread(
- endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
+ try (IOThread ioThread = new IOThread(null, endpointResultQueue, apacheGatewayConnection, 0, 0, 10, 10L, documentQueue, 0)) {
ioThread.post(doc1);
FeedEndpointException reportedException = futureException.get(120, TimeUnit.SECONDS);
assertThat(reportedException, instanceOf(FeedConnectException.class));