summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-09-18 16:03:05 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2020-09-18 16:03:05 +0200
commit849c980b2470bbba975fd3e4a14f43fabdbf9cc2 (patch)
treeb3a74ce89edcc04748416ed83d33e52bdeb83eb5 /vespaclient-container-plugin
parent65586893bb603a6c433b9c0047293ae605e654be (diff)
Reimplement flow control to work correctly with new threadpool model
Throttle http requests when http handler threadpool starts queuing. Always use non-blocking send method on messagebus session. Remove handling of messagebus status code that is never returned.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java98
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java34
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java2
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java48
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java161
6 files changed, 89 insertions, 260 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
index 5548f8fbc1f..c63593b3af6 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -8,7 +8,6 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.ReferencedResource;
-import java.util.logging.Level;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
@@ -28,10 +27,9 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
import java.util.logging.Logger;
-import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_FULL;
-
/**
* An instance of this class handles all requests from one client using VespaHttpClient.
*
@@ -51,13 +49,13 @@ class ClientFeederV3 {
private final String clientId;
private final ReplyHandler feedReplyHandler;
private final Metric metric;
+ private final HttpThrottlePolicy httpThrottlePolicy;
private Instant prevOpsPerSecTime = Instant.now();
private double operationsForOpsPerSec = 0d;
private final Object monitor = new Object();
private final StreamReaderV3 streamReaderV3;
private final AtomicInteger ongoingRequests = new AtomicInteger(0);
private final String hostName;
- private final AtomicInteger threadsAvailableForFeeding;
ClientFeederV3(
ReferencedResource<SharedSourceSession> sourceSession,
@@ -66,12 +64,12 @@ class ClientFeederV3 {
String clientId,
Metric metric,
ReplyHandler feedReplyHandler,
- AtomicInteger threadsAvailableForFeeding) {
+ HttpThrottlePolicy httpThrottlePolicy) {
this.sourceSession = sourceSession;
this.clientId = clientId;
this.feedReplyHandler = feedReplyHandler;
this.metric = metric;
- this.threadsAvailableForFeeding = threadsAvailableForFeeding;
+ this.httpThrottlePolicy = httpThrottlePolicy;
this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager);
this.hostName = HostName.getLocalhost();
}
@@ -104,36 +102,17 @@ class ClientFeederV3 {
}
public HttpResponse handleRequest(HttpRequest request) throws IOException {
- threadsAvailableForFeeding.decrementAndGet();
ongoingRequests.incrementAndGet();
try {
FeederSettings feederSettings = new FeederSettings(request);
- /*
- * The gateway handle overload from clients in different ways.
- *
- * If the backend is overloaded, but not the gateway, it will fill the backend, messagebus throttler
- * will start to block new documents and finally all threadsAvailableForFeeding will be blocking.
- * However, as more threads are added, the gateway will not block on messagebus but return
- * transitive errors on the documents that can not be processed. These errors will cause the client(s) to
- * back off a bit.
- *
- * However, we can also have the case that the gateway becomes the bottleneck (e.g. CPU). In this case
- * we need to stop processing of new messages as early as possible and reject the request. This
- * will cause the client(s) to back off for a while. We want some slack before we enter this mode.
- * If we can simply transitively fail each document, it is nicer. Therefor we allow some threads to be
- * busy processing requests with transitive errors before entering this mode. Since we already
- * have flooded the backend, have several threads hanging and waiting for capacity, the number should
- * not be very large. Too much slack can lead to too many threads handling feed and impacting query traffic.
- * We try 10 for now. This should only kick in with very massive feeding to few gateway nodes.
- */
- if (feederSettings.denyIfBusy && threadsAvailableForFeeding.get() < -10) {
+ if (httpThrottlePolicy.shouldThrottle()) {
return new ErrorHttpResponse(getOverloadReturnCode(request), "Gateway overloaded");
}
InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request);
BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>();
try {
- feed(feederSettings, inputStream, replies, threadsAvailableForFeeding);
+ feed(feederSettings, inputStream, replies);
synchronized (monitor) {
// Handshake requests do not have DATA_FORMAT, we do not want to give responses to
// handshakes as it won't be processed by the client.
@@ -146,12 +125,11 @@ class ClientFeederV3 {
} catch (Throwable e) {
log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
} finally {
- replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null));
+ replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null));
}
return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName);
} finally {
ongoingRequests.decrementAndGet();
- threadsAvailableForFeeding.incrementAndGet();
}
}
@@ -191,30 +169,14 @@ class ClientFeederV3 {
}
}
- private Result sendMessage(FeederSettings settings,
- DocumentOperationMessageV3 msg,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
- Result result = null;
- while (result == null || result.getError().getCode() == SEND_QUEUE_FULL) {
- msg.getMessage().pushHandler(feedReplyHandler);
-
- if (settings.denyIfBusy && threadsAvailableForFeeding.get() < 1) {
- return sourceSession.getResource().sendMessage(msg.getMessage());
- } else {
- result = sourceSession.getResource().sendMessageBlocking(msg.getMessage());
- }
- if (result.isAccepted()) {
- return result;
- }
- Thread.sleep(100);
- }
- return result;
+ private Result sendMessage(DocumentOperationMessageV3 msg) {
+ msg.getMessage().pushHandler(feedReplyHandler);
+ return sourceSession.getResource().sendMessage(msg.getMessage());
}
private void feed(FeederSettings settings,
InputStream requestInputStream,
- BlockingQueue<OperationStatus> repliesFromOldMessages,
- AtomicInteger threadsAvailableForFeeding) throws InterruptedException {
+ BlockingQueue<OperationStatus> repliesFromOldMessages) {
while (true) {
Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings,
requestInputStream,
@@ -225,13 +187,12 @@ class ClientFeederV3 {
Result result;
try {
- result = sendMessage(settings, message.get(), threadsAvailableForFeeding);
+ result = sendMessage(message.get());
} catch (RuntimeException e) {
repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
Exceptions.toMessageString(e),
ErrorCode.ERROR,
- false,
message.get().getMessage()));
continue;
}
@@ -244,27 +205,21 @@ class ClientFeederV3 {
repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
result.getError().getMessage(),
ErrorCode.TRANSIENT_ERROR,
- false,
- message.get().getMessage()));
+ message.get().getMessage()));
} else {
- // should probably not happen, but everybody knows stuff that
- // shouldn't happen, happens all the time
- boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED;
repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(),
result.getError().getMessage(),
ErrorCode.ERROR,
- isConditionNotMet,
- message.get().getMessage()));
+ message.get().getMessage()));
}
}
}
- private OperationStatus createOperationStatus(String id, String message,
- ErrorCode code, boolean isConditionNotMet, Message msg) {
+ private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) {
String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0
? msg.getTrace().toString()
: "";
- return new OperationStatus(message, id, code, isConditionNotMet, traceMessage);
+ return new OperationStatus(message, id, code, false, traceMessage);
}
// protected for mocking
@@ -340,4 +295,25 @@ class ClientFeederV3 {
}
}
+ /*
+ * The gateway handle overload from clients in different ways.
+ *
+ * If the backend is overloaded, but not the gateway, it will fill the backend, messagebus throttler
+ * will start to block new documents and finally all threadsAvailableForFeeding will be blocking.
+ * However, as more threads are added, the gateway will not block on messagebus but return
+ * transitive errors on the documents that can not be processed. These errors will cause the client(s) to
+ * back off a bit.
+ *
+ * However, we can also have the case that the gateway becomes the bottleneck (e.g. CPU). In this case
+ * we need to stop processing of new messages as early as possible and reject the request. This
+ * will cause the client(s) to back off for a while. We want some slack before we enter this mode.
+ * If we can simply transitively fail each document, it is nicer. Therefor we allow some threads to be
+ * busy processing requests with transitive errors before entering this mode. Since we already
+ * have flooded the backend, have several threads hanging and waiting for capacity, the number should
+ * not be very large. Too much slack can lead to too many threads handling feed and impacting query traffic.
+ */
+ interface HttpThrottlePolicy {
+ boolean shouldThrottle();
+ }
+
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
index 7776c8fa34c..fc26034a311 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.http.server;
import com.yahoo.collections.Tuple2;
-import com.yahoo.container.handler.ThreadpoolConfig;
import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
@@ -47,11 +46,10 @@ public class FeedHandler extends LoggingRequestHandler {
AccessLog accessLog,
DocumentmanagerConfig documentManagerConfig,
SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
- MetricReceiver metricReceiver) throws Exception {
+ MetricReceiver metricReceiver) {
super(threadpool.executor(), accessLog, metric);
metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server");
- feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, accessLog, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper);
+ feedHandlerV3 = new FeedHandlerV3(threadpool, metric, accessLog, documentManagerConfig, sessionCache, metricsHelper);
feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
index 9bd48b707f8..0f5a808c507 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java
@@ -2,7 +2,7 @@
package com.yahoo.vespa.http.server;
import com.yahoo.concurrent.ThreadFactoryFactory;
-import com.yahoo.container.handler.ThreadpoolConfig;
+import com.yahoo.container.handler.threadpool.ContainerThreadPool;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.LoggingRequestHandler;
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -44,31 +43,40 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
private final SessionCache sessionCache;
protected final ReplyHandler feedReplyHandler;
private final Metric metric;
+ private final ClientFeederV3.HttpThrottlePolicy httpThrottlePolicy;
private final Object monitor = new Object();
- private final AtomicInteger threadsAvailableForFeeding;
private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName());
- public FeedHandlerV3(Executor executor,
+ public FeedHandlerV3(ContainerThreadPool threadpool,
Metric metric,
AccessLog accessLog,
DocumentmanagerConfig documentManagerConfig,
SessionCache sessionCache,
- ThreadpoolConfig threadpoolConfig,
DocumentApiMetrics metricsHelper) {
+ this(threadpool.executor(),
+ () -> threadpool.queuedTasks() > 0,
+ metric,
+ accessLog,
+ documentManagerConfig,
+ sessionCache,
+ metricsHelper);
+ }
+
+ FeedHandlerV3(Executor executor,
+ ClientFeederV3.HttpThrottlePolicy httpThrottlePolicy,
+ Metric metric,
+ AccessLog accessLog,
+ DocumentmanagerConfig documentManagerConfig,
+ SessionCache sessionCache,
+ DocumentApiMetrics metricsHelper) {
super(executor, accessLog, metric);
+ this.httpThrottlePolicy = httpThrottlePolicy;
docTypeManager = new DocumentTypeManager(documentManagerConfig);
this.sessionCache = sessionCache;
feedReplyHandler = new FeedReplyReader(metric, metricsHelper);
cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandlerv3.cron"));
cron.scheduleWithFixedDelay(this::removeOldClients, 16, 11, TimeUnit.MINUTES);
this.metric = metric;
- // 40% of the threads can be blocking on feeding before we deny requests.
- if (threadpoolConfig != null) {
- threadsAvailableForFeeding = new AtomicInteger(Math.max((int) (0.4 * threadpoolConfig.maxthreads()), 1));
- } else {
- log.warning("No config for threadpool, using 200 for max blocking threads for feeding.");
- threadsAvailableForFeeding = new AtomicInteger(200);
- }
}
public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) {
@@ -91,7 +99,7 @@ public class FeedHandlerV3 extends LoggingRequestHandler {
clientId,
metric,
feedReplyHandler,
- threadsAvailableForFeeding));
+ httpThrottlePolicy));
}
clientFeederV3 = clientFeederByClientId.get(clientId);
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
index 909c643a006..0e112c49c37 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java
@@ -18,7 +18,6 @@ public class FeederSettings {
private static final Route DEFAULT_ROUTE = Route.parse("default");
public final boolean drain; // TODO: Implement drain=true
public final Route route;
- public final boolean denyIfBusy;
public final DataFormat dataFormat;
public final String priority;
public final Integer traceLevel;
@@ -26,7 +25,6 @@ public class FeederSettings {
public FeederSettings(HttpRequest request) {
this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false);
this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE);
- this.denyIfBusy = Optional.ofNullable(request.getHeader(Headers.DENY_IF_BUSY)).map(Boolean::parseBoolean).orElse(false);
// TODO: Change default to JSON on Vespa 8:
this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.XML_UTF8);
this.priority = request.getHeader(Headers.PRIORITY);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java
index 4b16822f804..af6b82b472d 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java
@@ -2,7 +2,6 @@
package com.yahoo.vespa.http.server;
import com.google.common.base.Splitter;
-import com.yahoo.container.handler.ThreadpoolConfig;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.jdisc.messagebus.SessionCache;
@@ -23,6 +22,7 @@ import com.yahoo.vespa.http.client.config.FeedParams;
import com.yahoo.vespa.http.client.core.ErrorCode;
import com.yahoo.vespa.http.client.core.Headers;
import com.yahoo.vespa.http.client.core.OperationStatus;
+import com.yahoo.vespa.http.server.ClientFeederV3.HttpThrottlePolicy;
import org.junit.Test;
import org.mockito.stubbing.Answer;
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -42,9 +43,12 @@ import static org.mockito.Mockito.when;
public class FeedTesterV3 {
final CollectingMetric metric = new CollectingMetric();
+ private static final HttpThrottlePolicy NON_THROTTLE = () -> false;
+ private static final HttpThrottlePolicy THROTTLE_ALWAYS = () -> true;
+
@Test
public void feedOneDocument() throws Exception {
- final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null);
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE);
HttpResponse httpResponse = feedHandlerV3.handle(createRequest(1));
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
httpResponse.render(outStream);
@@ -54,7 +58,7 @@ public class FeedTesterV3 {
@Test
public void feedOneBrokenDocument() throws Exception {
- final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null);
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE);
HttpResponse httpResponse = feedHandlerV3.handle(createBrokenRequest());
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
httpResponse.render(outStream);
@@ -65,7 +69,7 @@ public class FeedTesterV3 {
@Test
public void feedManyDocument() throws Exception {
- final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null);
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE);
HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100));
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
httpResponse.render(outStream);
@@ -74,6 +78,15 @@ public class FeedTesterV3 {
assertThat(Splitter.on("\n").splitToList(result).size(), is(101));
}
+ @Test
+ public void response_has_status_code_429_when_throttling() throws Exception {
+ final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(THROTTLE_ALWAYS);
+ HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100));
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ httpResponse.render(outStream);
+ assertEquals(httpResponse.getStatus(), 429);
+ }
+
private static DocumentTypeManager createDoctypeManager() {
DocumentTypeManager docTypeManager = new DocumentTypeManager();
DocumentType documentType = new DocumentType("testdocument");
@@ -113,35 +126,32 @@ public class FeedTesterV3 {
return request;
}
- private FeedHandlerV3 setupFeederHandler(ThreadpoolConfig threadPoolConfig) throws Exception {
+ private FeedHandlerV3 setupFeederHandler(HttpThrottlePolicy policy) throws Exception {
Executor threadPool = Executors.newCachedThreadPool();
+
DocumentmanagerConfig docMan = new DocumentmanagerConfig(new DocumentmanagerConfig.Builder().enablecompression(true));
FeedHandlerV3 feedHandlerV3 = new FeedHandlerV3(
threadPool,
+ policy,
metric,
AccessLog.voidAccessLog(),
docMan,
null /* session cache */,
- threadPoolConfig /* thread pool config */,
new DocumentApiMetrics(MetricReceiver.nullImplementation, "test")) {
@Override
protected ReferencedResource<SharedSourceSession> retainSource(
SessionCache sessionCache, SourceSessionParams sessionParams) {
SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class);
- try {
- when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer<?>) invocation -> {
- Object[] args = invocation.getArguments();
- PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0];
- ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext();
- replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace"));
- Result result = mock(Result.class);
- when(result.isAccepted()).thenReturn(true);
- return result;
- });
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ when(sharedSourceSession.sendMessage(any())).thenAnswer((Answer<?>) invocation -> {
+ Object[] args = invocation.getArguments();
+ PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0];
+ ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext();
+ replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace"));
+ Result result = mock(Result.class);
+ when(result.isAccepted()).thenReturn(true);
+ return result;
+ });
Result result = mock(Result.class);
when(result.isAccepted()).thenReturn(true);
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java
deleted file mode 100644
index 04b66480a82..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java
+++ /dev/null
@@ -1,161 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.http.server;
-
-import com.yahoo.container.jdisc.HttpRequest;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.documentapi.metrics.DocumentApiMetrics;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.jdisc.ReferencedResource;
-import com.yahoo.jdisc.References;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.MessageBus;
-import com.yahoo.messagebus.MessageBusParams;
-import com.yahoo.messagebus.ReplyHandler;
-import com.yahoo.messagebus.Result;
-import com.yahoo.messagebus.SourceSessionParams;
-import com.yahoo.messagebus.shared.SharedMessageBus;
-import com.yahoo.messagebus.shared.SharedSourceSession;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.vespa.http.client.core.Headers;
-import com.yahoo.vespaxmlparser.FeedOperation;
-import com.yahoo.vespaxmlparser.MockFeedReaderFactory;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertTrue;
-
-
-public class V3CongestionTestCase {
- AtomicInteger threadsAvail = new AtomicInteger(10);
- AtomicInteger requests = new AtomicInteger(0);
-
-
- static class ClientFeederWithMocks extends ClientFeederV3 {
-
- private final DocumentOperationMessageV3 docOp;
-
- ClientFeederWithMocks(ReferencedResource<SharedSourceSession> sourceSession, FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager, String clientId, Metric metric, ReplyHandler feedReplyHandler, AtomicInteger threadsAvailableForFeeding) {
- super(sourceSession, feedReaderFactory, docTypeManager, clientId, metric, feedReplyHandler, threadsAvailableForFeeding);
- // The operation to return from the client feeder.
- docOp = DocumentOperationMessageV3.newRemoveMessage(FeedOperation.INVALID, "operation id");
-
- }
-
- @Override
- protected DocumentOperationMessageV3 getNextMessage(
- String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception {
- while (true) {
- int data = requestInputStream.read();
- if (data == -1 || data == (char)'\n') {
- break;
- }
- }
- return docOp;
- }
- }
-
- final static int NUMBER_OF_QUEUE_FULL_RESPONSES = 5;
-
- ClientFeederV3 clientFeederV3;
- HttpRequest request;
-
- @Before
- public void setup() {
- // Set up a request to be used from the tests.
- InputStream in = new MetaStream(new byte[] { 1 });
- request = HttpRequest
- .createTestRequest(
- "http://foo.bar:19020/reserved-for-internal-use/feedapi",
- com.yahoo.jdisc.http.HttpRequest.Method.POST, in);
- request.getJDiscRequest().headers().add(Headers.VERSION, "3");
- request.getJDiscRequest().headers().add(Headers.CLIENT_ID, "clientId");
-
-
- // Create a mock that does not parse the message, only reads the rest of the line. Makes it easier
- // to write tests. It uses a mock for message bus.
- clientFeederV3 = new ClientFeederWithMocks(
- retainMockSession(new SourceSessionParams(), requests),
- new MockFeedReaderFactory(),
- null /*DocTypeManager*/,
- "clientID",
- null/*metric*/,
- new FeedReplyReader(null/*metric*/, new DocumentApiMetrics(MetricReceiver.nullImplementation, "tester")),
- threadsAvail);
- }
-
- // A mock for message bus that can simulate blocking requests.
- private static class MockSharedSession extends SharedSourceSession {
- boolean queuFull = true;
- AtomicInteger requests;
-
- public MockSharedSession(SourceSessionParams params, AtomicInteger requests) {
- super(new SharedMessageBus(new MessageBus(new MockNetwork(),
- new MessageBusParams())), params);
- this.requests = requests;
- }
-
- @Override
- public Result sendMessageBlocking(Message msg) throws InterruptedException {
- return sendMessage(msg);
- }
-
- @Override
- public Result sendMessage(Message msg) {
- ReplyHandler handler = msg.popHandler();
- if (queuFull) {
- requests.incrementAndGet();
- // Disable queue full after some attempts
- if (requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES) {
- queuFull = false;
- }
- Result r = new Result(ErrorCode.SEND_QUEUE_FULL, "queue full");
- return r;
- }
-
- handler.handleReply(new MockReply(msg.getContext()));
- return Result.ACCEPTED;
- }
- }
-
- ReferencedResource<SharedSourceSession> retainMockSession(
- SourceSessionParams sessionParams,
- AtomicInteger requests) {
- final SharedSourceSession session = new MockSharedSession(sessionParams, requests);
- return new ReferencedResource<>(session, References.fromResource(session));
- }
-
- @Test
- public void testRetriesWhenThreadsAvailable() throws IOException {
- request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true");
- threadsAvail.set(10);
-
- clientFeederV3.handleRequest(request);
- assertTrue(requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES);
- }
-
- @Test
- public void testNoRetriesWhenNoThreadsAvailable() throws IOException {
- request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true");
- threadsAvail.set(0);
-
- clientFeederV3.handleRequest(request);
- assertTrue(requests.get() == 1);
- }
-
- @Test
- public void testRetriesWhenNoThreadsAvailableButNoDenyIfBusy() throws IOException {
- request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "false");
- threadsAvail.set(0);
-
- clientFeederV3.handleRequest(request);
- assertTrue(requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES);
- }
-}