aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2020-09-30 11:24:47 +0200
committerGitHub <noreply@github.com>2020-09-30 11:24:47 +0200
commitff205ce5e2eccafeb0957007fb2671f1488e57c3 (patch)
treee28a42874e48c939139a241ac5600aebd65e1baf
parentbd6373019e2844f0c20cc1f7696a3e017be9c08c (diff)
parent4a59ca6b5122c228185782d19dffa7eaf61d2b4f (diff)
Merge pull request #14631 from vespa-engine/bjorncs/feed-handler-fix
Block feed requests while messagebus queue is full MERGEOK
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java7
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java22
2 files changed, 17 insertions, 12 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
index 7959945e8a0..6e9f3cc590e 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/vespa/http/server/ClientFeederV3.java
@@ -114,6 +114,7 @@ class ClientFeederV3 {
}
}
} catch (InterruptedException e) {
+ log.log(Level.FINE, e, () -> "Feed handler was interrupted: " + e.getMessage());
// NOP, just terminate
} catch (Throwable e) {
log.log(Level.WARNING, "Unhandled exception while feeding: " + Exceptions.toMessageString(e), e);
@@ -157,14 +158,14 @@ class ClientFeederV3 {
}
}
- private Result sendMessage(DocumentOperationMessageV3 msg) {
+ private Result sendMessage(DocumentOperationMessageV3 msg) throws InterruptedException {
msg.getMessage().pushHandler(feedReplyHandler);
- return sourceSession.getResource().sendMessage(msg.getMessage());
+ return sourceSession.getResource().sendMessageBlocking(msg.getMessage());
}
private void feed(FeederSettings settings,
InputStream requestInputStream,
- BlockingQueue<OperationStatus> repliesFromOldMessages) {
+ BlockingQueue<OperationStatus> repliesFromOldMessages) throws InterruptedException {
while (true) {
Optional<DocumentOperationMessageV3> message = pullMessageFromRequest(settings,
requestInputStream,
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java
index bda2d8ea861..4d56448b587 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/vespa/http/server/FeedHandlerV3Test.java
@@ -127,15 +127,19 @@ public class FeedHandlerV3Test {
SessionCache sessionCache, SourceSessionParams sessionParams) {
SharedSourceSession sharedSourceSession = mock(SharedSourceSession.class);
- when(sharedSourceSession.sendMessage(any())).thenAnswer((Answer<?>) invocation -> {
- Object[] args = invocation.getArguments();
- PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0];
- ReplyContext replyContext = (ReplyContext)putDocumentMessage.getContext();
- replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace"));
- Result result = mock(Result.class);
- when(result.isAccepted()).thenReturn(true);
- return result;
- });
+ try {
+ when(sharedSourceSession.sendMessageBlocking(any())).thenAnswer((Answer<?>) invocation -> {
+ Object[] args = invocation.getArguments();
+ PutDocumentMessage putDocumentMessage = (PutDocumentMessage) args[0];
+ ReplyContext replyContext = (ReplyContext) putDocumentMessage.getContext();
+ replyContext.feedReplies.add(new OperationStatus("message", replyContext.docId, ErrorCode.OK, false, "trace"));
+ Result result = mock(Result.class);
+ when(result.isAccepted()).thenReturn(true);
+ return result;
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
Result result = mock(Result.class);
when(result.isAccepted()).thenReturn(true);