From 4a59ca6b5122c228185782d19dffa7eaf61d2b4f Mon Sep 17 00:00:00 2001 From: Bjørn Christian Seime Date: Wed, 30 Sep 2020 11:15:23 +0200 Subject: Block feed requests while messagebus queue is full Fix for transient error 'full messagebus queue' being reported back to vespa-http-client. The backpressure handling in ContainerThreadPool/ThreadedRequestHandler will respond with 299/429 when the pipeline is blocked. --- .../yahoo/vespa/http/server/ClientFeederV3.java | 7 ++++--- .../yahoo/vespa/http/server/FeedHandlerV3Test.java | 22 +++++++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java index 7959945e8a0..6e9f3cc590e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java @@ -114,6 +114,7 @@ class ClientFeederV3 { } } } catch (InterruptedException e) { + log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage()); // NOP, just terminate } catch (Throwable e) { log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e); @@ -157,14 +158,14 @@ class ClientFeederV3 { } } - private Result sendMessage(DocumentOperationMessageV3 msg) { + private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException { msg.getMessage().pushHandler(feedReplyHandler); - return sourceSession.getResource().sendMessage(msg.getMessage()); + return sourceSession.getResource().sendMessageBlocking(msg.getMessage()); } private void feed(FeederSettings settings, InputStream requestInputStream, - BlockingQueue repliesFromOldMessages) { + BlockingQueue repliesFromOldMessages) throws InterruptedException { while (true) { Optional message = pullMessageFromRequest(settings, requestInputStream, diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java index bda2d8ea861..4d56448b587 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java @@ -127,15 +127,19 @@ public class FeedHandlerV3Test { SessionCache sessionCache, SourceSessionParams sessionParams) { SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class); - when(sharedSourceSession.sendMessage(any())).thenAnswer((Answer) invocation -> { - Object[] args = invocation.getArguments(); - PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0]; - ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext(); - replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace")); - Result result = mock(Result.class); - when(result.isAccepted()).thenReturn(true); - return result; - }); + try { + when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0]; + ReplyContext replyContext = (ReplyContext) putDocumentMessage.getContext(); + replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace")); + Result result = mock(Result.class); + when(result.isAccepted()).thenReturn(true); + return result; + }); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } Result result = mock(Result.class); when(result.isAccepted()).thenReturn(true); -- cgit v1.2.3