diff options
author | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-18 16:03:05 +0200 |
---|---|---|
committer | Bjørn Christian Seime <bjorncs@verizonmedia.com> | 2020-09-18 16:03:05 +0200 |
commit | 849c980b2470bbba975fd3e4a14f43fabdbf9cc2 (patch) | |
tree | b3a74ce89edcc04748416ed83d33e52bdeb83eb5 /vespaclient-container-plugin | |
parent | 65586893bb603a6c433b9c0047293ae605e654be (diff) |
Reimplement flow control to work correctly with new threadpool model
Throttle http requests when http handler threadpool starts queuing.
Always use non-blocking send method on messagebus session.
Remove handling of messagebus status code that is never returned.
Diffstat (limited to 'vespaclient-container-plugin')
6 files changed, 89 insertions, 260 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index 5548f8fbc1f..c63593b3af6 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -8,7 +8,6 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.ReferencedResource; -import java.util.logging.Level; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.Result; @@ -28,10 +27,9 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; import java.util.logging.Logger; -import static com.yahoo.messagebus.ErrorCode.SEND_QUEUE_FULL; - /** * An instance of this class handles all requests from one client using VespaHttpClient. * @@ -51,13 +49,13 @@ class ClientFeederV3 { private final String clientId; private final ReplyHandler feedReplyHandler; private final Metric metric; + private final HttpThrottlePolicy httpThrottlePolicy; private Instant prevOpsPerSecTime = Instant.now(); private double operationsForOpsPerSec = 0d; private final Object monitor = new Object(); private final StreamReaderV3 streamReaderV3; private final AtomicInteger ongoingRequests = new AtomicInteger(0); private final String hostName; - private final AtomicInteger threadsAvailableForFeeding; ClientFeederV3( ReferencedResource<SharedSourceSession> sourceSession, @@ -66,12 +64,12 @@ class ClientFeederV3 { String clientId, Metric metric, ReplyHandler feedReplyHandler, - AtomicInteger threadsAvailableForFeeding) { + HttpThrottlePolicy httpThrottlePolicy) { this.sourceSession = sourceSession; this.clientId = clientId; this.feedReplyHandler = feedReplyHandler; this.metric = metric; - this.threadsAvailableForFeeding = threadsAvailableForFeeding; + this.httpThrottlePolicy = httpThrottlePolicy; this.streamReaderV3 = new StreamReaderV3(feedReaderFactory, docTypeManager); this.hostName = HostName.getLocalhost(); } @@ -104,36 +102,17 @@ class ClientFeederV3 { } public HttpResponse handleRequest(HttpRequest request) throws IOException { - threadsAvailableForFeeding.decrementAndGet(); ongoingRequests.incrementAndGet(); try { FeederSettings feederSettings = new FeederSettings(request); - /* - * The gateway handle overload from clients in different ways. - * - * If the backend is overloaded, but not the gateway, it will fill the backend, messagebus throttler - * will start to block new documents and finally all threadsAvailableForFeeding will be blocking. - * However, as more threads are added, the gateway will not block on messagebus but return - * transitive errors on the documents that can not be processed. These errors will cause the client(s) to - * back off a bit. - * - * However, we can also have the case that the gateway becomes the bottleneck (e.g. CPU). In this case - * we need to stop processing of new messages as early as possible and reject the request. This - * will cause the client(s) to back off for a while. We want some slack before we enter this mode. - * If we can simply transitively fail each document, it is nicer. Therefor we allow some threads to be - * busy processing requests with transitive errors before entering this mode. Since we already - * have flooded the backend, have several threads hanging and waiting for capacity, the number should - * not be very large. Too much slack can lead to too many threads handling feed and impacting query traffic. - * We try 10 for now. This should only kick in with very massive feeding to few gateway nodes. - */ - if (feederSettings.denyIfBusy && threadsAvailableForFeeding.get() < -10) { + if (httpThrottlePolicy.shouldThrottle()) { return new ErrorHttpResponse(getOverloadReturnCode(request), "Gateway overloaded"); } InputStream inputStream = StreamReaderV3.unzipStreamIfNeeded(request); BlockingQueue<OperationStatus> replies = new LinkedBlockingQueue<>(); try { - feed(feederSettings, inputStream, replies, threadsAvailableForFeeding); + feed(feederSettings, inputStream, replies); synchronized (monitor) { // Handshake requests do not have DATA_FORMAT, we do not want to give responses to // handshakes as it won't be processed by the client. @@ -146,12 +125,11 @@ class ClientFeederV3 { } catch (Throwable e) { log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); } finally { - replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, false, null)); + replies.add(createOperationStatus("-", "-", ErrorCode.END_OF_FEED, null)); } return new FeedResponse(200, replies, 3, clientId, outstandingOperations.get(), hostName); } finally { ongoingRequests.decrementAndGet(); - threadsAvailableForFeeding.incrementAndGet(); } } @@ -191,30 +169,14 @@ class ClientFeederV3 { } } - private Result sendMessage(FeederSettings settings, - DocumentOperationMessageV3 msg, - AtomicInteger threadsAvailableForFeeding) throws InterruptedException { - Result result = null; - while (result == null || result.getError().getCode() == SEND_QUEUE_FULL) { - msg.getMessage().pushHandler(feedReplyHandler); - - if (settings.denyIfBusy && threadsAvailableForFeeding.get() < 1) { - return sourceSession.getResource().sendMessage(msg.getMessage()); - } else { - result = sourceSession.getResource().sendMessageBlocking(msg.getMessage()); - } - if (result.isAccepted()) { - return result; - } - Thread.sleep(100); - } - return result; + private Result sendMessage(DocumentOperationMessageV3 msg) { + msg.getMessage().pushHandler(feedReplyHandler); + return sourceSession.getResource().sendMessage(msg.getMessage()); } private void feed(FeederSettings settings, InputStream requestInputStream, - BlockingQueue<OperationStatus> repliesFromOldMessages, - AtomicInteger threadsAvailableForFeeding) throws InterruptedException { + BlockingQueue<OperationStatus> repliesFromOldMessages) { while (true) { Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings, requestInputStream, @@ -225,13 +187,12 @@ class ClientFeederV3 { Result result; try { - result = sendMessage(settings, message.get(), threadsAvailableForFeeding); + result = sendMessage(message.get()); } catch (RuntimeException e) { repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), Exceptions.toMessageString(e), ErrorCode.ERROR, - false, message.get().getMessage())); continue; } @@ -244,27 +205,21 @@ class ClientFeederV3 { repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.TRANSIENT_ERROR, - false, - message.get().getMessage())); + message.get().getMessage())); } else { - // should probably not happen, but everybody knows stuff that - // shouldn't happen, happens all the time - boolean isConditionNotMet = result.getError().getCode() == DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED; repliesFromOldMessages.add(createOperationStatus(message.get().getOperationId(), result.getError().getMessage(), ErrorCode.ERROR, - isConditionNotMet, - message.get().getMessage())); + message.get().getMessage())); } } } - private OperationStatus createOperationStatus(String id, String message, - ErrorCode code, boolean isConditionNotMet, Message msg) { + private OperationStatus createOperationStatus(String id, String message, ErrorCode code, Message msg) { String traceMessage = msg != null && msg.getTrace() != null && msg.getTrace().getLevel() > 0 ? msg.getTrace().toString() : ""; - return new OperationStatus(message, id, code, isConditionNotMet, traceMessage); + return new OperationStatus(message, id, code, false, traceMessage); } // protected for mocking @@ -340,4 +295,25 @@ class ClientFeederV3 { } } + /* + * The gateway handle overload from clients in different ways. + * + * If the backend is overloaded, but not the gateway, it will fill the backend, messagebus throttler + * will start to block new documents and finally all threadsAvailableForFeeding will be blocking. + * However, as more threads are added, the gateway will not block on messagebus but return + * transitive errors on the documents that can not be processed. These errors will cause the client(s) to + * back off a bit. + * + * However, we can also have the case that the gateway becomes the bottleneck (e.g. CPU). In this case + * we need to stop processing of new messages as early as possible and reject the request. This + * will cause the client(s) to back off for a while. We want some slack before we enter this mode. + * If we can simply transitively fail each document, it is nicer. Therefor we allow some threads to be + * busy processing requests with transitive errors before entering this mode. Since we already + * have flooded the backend, have several threads hanging and waiting for capacity, the number should + * not be very large. Too much slack can lead to too many threads handling feed and impacting query traffic. + */ + interface HttpThrottlePolicy { + boolean shouldThrottle(); + } + } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java index 7776c8fa34c..fc26034a311 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandler.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.http.server; import com.yahoo.collections.Tuple2; -import com.yahoo.container.handler.ThreadpoolConfig; import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; @@ -47,11 +46,10 @@ public class FeedHandler extends LoggingRequestHandler { AccessLog accessLog, DocumentmanagerConfig documentManagerConfig, SessionCache sessionCache, - ThreadpoolConfig threadpoolConfig, - MetricReceiver metricReceiver) throws Exception { + MetricReceiver metricReceiver) { super(threadpool.executor(), accessLog, metric); metricsHelper = new DocumentApiMetrics(metricReceiver, "vespa.http.server"); - feedHandlerV3 = new FeedHandlerV3(threadpool.executor(), metric, accessLog, documentManagerConfig, sessionCache, threadpoolConfig, metricsHelper); + feedHandlerV3 = new FeedHandlerV3(threadpool, metric, accessLog, documentManagerConfig, sessionCache, metricsHelper); feedReplyHandler = new FeedReplyReader(metric, metricsHelper); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java index 9bd48b707f8..0f5a808c507 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeedHandlerV3.java @@ -2,7 +2,7 @@ package com.yahoo.vespa.http.server; import com.yahoo.concurrent.ThreadFactoryFactory; -import com.yahoo.container.handler.ThreadpoolConfig; +import com.yahoo.container.handler.threadpool.ContainerThreadPool; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.LoggingRequestHandler; @@ -25,7 +25,6 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -44,31 +43,40 @@ public class FeedHandlerV3 extends LoggingRequestHandler { private final SessionCache sessionCache; protected final ReplyHandler feedReplyHandler; private final Metric metric; + private final ClientFeederV3.HttpThrottlePolicy httpThrottlePolicy; private final Object monitor = new Object(); - private final AtomicInteger threadsAvailableForFeeding; private static final Logger log = Logger.getLogger(FeedHandlerV3.class.getName()); - public FeedHandlerV3(Executor executor, + public FeedHandlerV3(ContainerThreadPool threadpool, Metric metric, AccessLog accessLog, DocumentmanagerConfig documentManagerConfig, SessionCache sessionCache, - ThreadpoolConfig threadpoolConfig, DocumentApiMetrics metricsHelper) { + this(threadpool.executor(), + () -> threadpool.queuedTasks() > 0, + metric, + accessLog, + documentManagerConfig, + sessionCache, + metricsHelper); + } + + FeedHandlerV3(Executor executor, + ClientFeederV3.HttpThrottlePolicy httpThrottlePolicy, + Metric metric, + AccessLog accessLog, + DocumentmanagerConfig documentManagerConfig, + SessionCache sessionCache, + DocumentApiMetrics metricsHelper) { super(executor, accessLog, metric); + this.httpThrottlePolicy = httpThrottlePolicy; docTypeManager = new DocumentTypeManager(documentManagerConfig); this.sessionCache = sessionCache; feedReplyHandler = new FeedReplyReader(metric, metricsHelper); cron = new ScheduledThreadPoolExecutor(1, ThreadFactoryFactory.getThreadFactory("feedhandlerv3.cron")); cron.scheduleWithFixedDelay(this::removeOldClients, 16, 11, TimeUnit.MINUTES); this.metric = metric; - // 40% of the threads can be blocking on feeding before we deny requests. - if (threadpoolConfig != null) { - threadsAvailableForFeeding = new AtomicInteger(Math.max((int) (0.4 * threadpoolConfig.maxthreads()), 1)); - } else { - log.warning("No config for threadpool, using 200 for max blocking threads for feeding."); - threadsAvailableForFeeding = new AtomicInteger(200); - } } public void injectDocumentManangerForTests(DocumentTypeManager docTypeManager) { @@ -91,7 +99,7 @@ public class FeedHandlerV3 extends LoggingRequestHandler { clientId, metric, feedReplyHandler, - threadsAvailableForFeeding)); + httpThrottlePolicy)); } clientFeederV3 = clientFeederByClientId.get(clientId); } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java index 909c643a006..0e112c49c37 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/FeederSettings.java @@ -18,7 +18,6 @@ public class FeederSettings { private static final Route DEFAULT_ROUTE = Route.parse("default"); public final boolean drain; // TODO: Implement drain=true public final Route route; - public final boolean denyIfBusy; public final DataFormat dataFormat; public final String priority; public final Integer traceLevel; @@ -26,7 +25,6 @@ public class FeederSettings { public FeederSettings(HttpRequest request) { this.drain = Optional.ofNullable(request.getHeader(Headers.DRAIN)).map(Boolean::parseBoolean).orElse(false); this.route = Optional.ofNullable(request.getHeader(Headers.ROUTE)).map(Route::parse).orElse(DEFAULT_ROUTE); - this.denyIfBusy = Optional.ofNullable(request.getHeader(Headers.DENY_IF_BUSY)).map(Boolean::parseBoolean).orElse(false); // TODO: Change default to JSON on Vespa 8: this.dataFormat = Optional.ofNullable(request.getHeader(Headers.DATA_FORMAT)).map(DataFormat::valueOf).orElse(DataFormat.XML_UTF8); this.priority = request.getHeader(Headers.PRIORITY); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java index 4b16822f804..af6b82b472d 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedTesterV3.java @@ -2,7 +2,6 @@ package com.yahoo.vespa.http.server; import com.google.common.base.Splitter; -import com.yahoo.container.handler.ThreadpoolConfig; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.jdisc.messagebus.SessionCache; @@ -23,6 +22,7 @@ import com.yahoo.vespa.http.client.config.FeedParams; import com.yahoo.vespa.http.client.core.ErrorCode; import com.yahoo.vespa.http.client.core.Headers; import com.yahoo.vespa.http.client.core.OperationStatus; +import com.yahoo.vespa.http.server.ClientFeederV3.HttpThrottlePolicy; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -42,9 +43,12 @@ import static org.mockito.Mockito.when; public class FeedTesterV3 { final CollectingMetric metric = new CollectingMetric(); + private static final HttpThrottlePolicy NON_THROTTLE = () -> false; + private static final HttpThrottlePolicy THROTTLE_ALWAYS = () -> true; + @Test public void feedOneDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null); + final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE); HttpResponse httpResponse = feedHandlerV3.handle(createRequest(1)); ByteArrayOutputStream outStream = new ByteArrayOutputStream(); httpResponse.render(outStream); @@ -54,7 +58,7 @@ public class FeedTesterV3 { @Test public void feedOneBrokenDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null); + final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE); HttpResponse httpResponse = feedHandlerV3.handle(createBrokenRequest()); ByteArrayOutputStream outStream = new ByteArrayOutputStream(); httpResponse.render(outStream); @@ -65,7 +69,7 @@ public class FeedTesterV3 { @Test public void feedManyDocument() throws Exception { - final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(null); + final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(NON_THROTTLE); HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100)); ByteArrayOutputStream outStream = new ByteArrayOutputStream(); httpResponse.render(outStream); @@ -74,6 +78,15 @@ public class FeedTesterV3 { assertThat(Splitter.on("\n").splitToList(result).size(), is(101)); } + @Test + public void response_has_status_code_429_when_throttling() throws Exception { + final FeedHandlerV3 feedHandlerV3 = setupFeederHandler(THROTTLE_ALWAYS); + HttpResponse httpResponse = feedHandlerV3.handle(createRequest(100)); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + httpResponse.render(outStream); + assertEquals(httpResponse.getStatus(), 429); + } + private static DocumentTypeManager createDoctypeManager() { DocumentTypeManager docTypeManager = new DocumentTypeManager(); DocumentType documentType = new DocumentType("testdocument"); @@ -113,35 +126,32 @@ public class FeedTesterV3 { return request; } - private FeedHandlerV3 setupFeederHandler(ThreadpoolConfig threadPoolConfig) throws Exception { + private FeedHandlerV3 setupFeederHandler(HttpThrottlePolicy policy) throws Exception { Executor threadPool = Executors.newCachedThreadPool(); + DocumentmanagerConfig docMan = new DocumentmanagerConfig(new DocumentmanagerConfig.Builder().enablecompression(true)); FeedHandlerV3 feedHandlerV3 = new FeedHandlerV3( threadPool, + policy, metric, AccessLog.voidAccessLog(), docMan, null /* session cache */, - threadPoolConfig /* thread pool config */, new DocumentApiMetrics(MetricReceiver.nullImplementation, "test")) { @Override protected ReferencedResource<SharedSourceSession> retainSource( SessionCache sessionCache, SourceSessionParams sessionParams) { SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class); - try { - when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer<?>) invocation -> { - Object[] args = invocation.getArguments(); - PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0]; - ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext(); - replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace")); - Result result = mock(Result.class); - when(result.isAccepted()).thenReturn(true); - return result; - }); - } catch (InterruptedException e) { - e.printStackTrace(); - } + when(sharedSourceSession.sendMessage(any())).thenAnswer((Answer<?>) invocation -> { + Object[] args = invocation.getArguments(); + PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0]; + ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext(); + replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace")); + Result result = mock(Result.class); + when(result.isAccepted()).thenReturn(true); + return result; + }); Result result = mock(Result.class); when(result.isAccepted()).thenReturn(true); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java deleted file mode 100644 index 04b66480a82..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/V3CongestionTestCase.java +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.http.server; - -import com.yahoo.container.jdisc.HttpRequest; -import com.yahoo.document.DocumentTypeManager; -import com.yahoo.documentapi.metrics.DocumentApiMetrics; -import com.yahoo.jdisc.Metric; -import com.yahoo.jdisc.ReferencedResource; -import com.yahoo.jdisc.References; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.MessageBus; -import com.yahoo.messagebus.MessageBusParams; -import com.yahoo.messagebus.ReplyHandler; -import com.yahoo.messagebus.Result; -import com.yahoo.messagebus.SourceSessionParams; -import com.yahoo.messagebus.shared.SharedMessageBus; -import com.yahoo.messagebus.shared.SharedSourceSession; -import com.yahoo.metrics.simple.MetricReceiver; -import com.yahoo.vespa.http.client.core.Headers; -import com.yahoo.vespaxmlparser.FeedOperation; -import com.yahoo.vespaxmlparser.MockFeedReaderFactory; - -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.io.InputStream; - - -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertTrue; - - -public class V3CongestionTestCase { - AtomicInteger threadsAvail = new AtomicInteger(10); - AtomicInteger requests = new AtomicInteger(0); - - - static class ClientFeederWithMocks extends ClientFeederV3 { - - private final DocumentOperationMessageV3 docOp; - - ClientFeederWithMocks(ReferencedResource<SharedSourceSession> sourceSession, FeedReaderFactory feedReaderFactory, DocumentTypeManager docTypeManager, String clientId, Metric metric, ReplyHandler feedReplyHandler, AtomicInteger threadsAvailableForFeeding) { - super(sourceSession, feedReaderFactory, docTypeManager, clientId, metric, feedReplyHandler, threadsAvailableForFeeding); - // The operation to return from the client feeder. - docOp = DocumentOperationMessageV3.newRemoveMessage(FeedOperation.INVALID, "operation id"); - - } - - @Override - protected DocumentOperationMessageV3 getNextMessage( - String operationId, InputStream requestInputStream, FeederSettings settings) throws Exception { - while (true) { - int data = requestInputStream.read(); - if (data == -1 || data == (char)'\n') { - break; - } - } - return docOp; - } - } - - final static int NUMBER_OF_QUEUE_FULL_RESPONSES = 5; - - ClientFeederV3 clientFeederV3; - HttpRequest request; - - @Before - public void setup() { - // Set up a request to be used from the tests. - InputStream in = new MetaStream(new byte[] { 1 }); - request = HttpRequest - .createTestRequest( - "http://foo.bar:19020/reserved-for-internal-use/feedapi", - com.yahoo.jdisc.http.HttpRequest.Method.POST, in); - request.getJDiscRequest().headers().add(Headers.VERSION, "3"); - request.getJDiscRequest().headers().add(Headers.CLIENT_ID, "clientId"); - - - // Create a mock that does not parse the message, only reads the rest of the line. Makes it easier - // to write tests. It uses a mock for message bus. - clientFeederV3 = new ClientFeederWithMocks( - retainMockSession(new SourceSessionParams(), requests), - new MockFeedReaderFactory(), - null /*DocTypeManager*/, - "clientID", - null/*metric*/, - new FeedReplyReader(null/*metric*/, new DocumentApiMetrics(MetricReceiver.nullImplementation, "tester")), - threadsAvail); - } - - // A mock for message bus that can simulate blocking requests. - private static class MockSharedSession extends SharedSourceSession { - boolean queuFull = true; - AtomicInteger requests; - - public MockSharedSession(SourceSessionParams params, AtomicInteger requests) { - super(new SharedMessageBus(new MessageBus(new MockNetwork(), - new MessageBusParams())), params); - this.requests = requests; - } - - @Override - public Result sendMessageBlocking(Message msg) throws InterruptedException { - return sendMessage(msg); - } - - @Override - public Result sendMessage(Message msg) { - ReplyHandler handler = msg.popHandler(); - if (queuFull) { - requests.incrementAndGet(); - // Disable queue full after some attempts - if (requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES) { - queuFull = false; - } - Result r = new Result(ErrorCode.SEND_QUEUE_FULL, "queue full"); - return r; - } - - handler.handleReply(new MockReply(msg.getContext())); - return Result.ACCEPTED; - } - } - - ReferencedResource<SharedSourceSession> retainMockSession( - SourceSessionParams sessionParams, - AtomicInteger requests) { - final SharedSourceSession session = new MockSharedSession(sessionParams, requests); - return new ReferencedResource<>(session, References.fromResource(session)); - } - - @Test - public void testRetriesWhenThreadsAvailable() throws IOException { - request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true"); - threadsAvail.set(10); - - clientFeederV3.handleRequest(request); - assertTrue(requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES); - } - - @Test - public void testNoRetriesWhenNoThreadsAvailable() throws IOException { - request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "true"); - threadsAvail.set(0); - - clientFeederV3.handleRequest(request); - assertTrue(requests.get() == 1); - } - - @Test - public void testRetriesWhenNoThreadsAvailableButNoDenyIfBusy() throws IOException { - request.getJDiscRequest().headers().add(Headers.DENY_IF_BUSY, "false"); - threadsAvail.set(0); - - clientFeederV3.handleRequest(request); - assertTrue(requests.get() == NUMBER_OF_QUEUE_FULL_RESPONSES); - } -} |