summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-02-02 17:43:16 +0100
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-02-03 14:54:52 +0100
commitf5e2d1f7465107aada62516db7344549e061df76 (patch)
treebd94c7840c126dc0891ad9e0b79227a56c8e9061 /documentapi
parentf095e16d4fa4b919fb1bf1a7f509322638d0fa6b (diff)
Avoid visitor reply processing starvation and runaway throttling
We both send requests and process replies in the context of a dedicated task executor pool. However, MessageBus sending and reply receiving happens in the context of entirely separate threads. If the backend responds very quickly to visitor requests (such as if buckets are empty), this can leave us in the following awkward position: 1. Replies arrive from backend, open up the throttle window, reply handling task gets pushed onto executor queue (but not yet executed). 2. Send loop below continuously get a free send slot, keeps sending visitors and filling up the set of pending buckets in the progress token. 3. Since visitor session is busy-looping in the send task, reply processing is consequently entirely starved until the MessageBus throttle window is bursting at the seams. This can effectively nullify the effects of the throttling policy, especially if it's dynamic. But a static throttle policy with a sufficiently high max window size will also potentially cause a runaway visitor train since the active window size keeps getting decreased by backend replies. To get around this, we explicitly check for concurrently scheduled message handling tasks from the transport layer, breaking the loop if at least one handler has been scheduled. This also has the (positive) effect of draining all reply tasks before we start sending more work downstream. Since visitor session progress is edge-triggered and progresses exclusively by sending new visitors in reply handling tasks, it's critical that we never end up in a situation where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively hanging the session. We must therefore be very careful that we only exit the send loop if we _know_ we have at least one pending task enqueued that will ensure session progress. This is a subtle thread interaction issue and cannot readily be unit tested, but manual testing has confirmed both the underlying issue and that the throttling policy does not run amok after the fix has been applied.
Diffstat (limited to 'documentapi')
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java70
1 files changed, 67 insertions, 3 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 1c473a196f3..0da9d0ea621 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -327,9 +327,11 @@ public class MessageBusVisitorSession implements VisitorSession {
private final String sessionName = createSessionName();
private final String dataDestination;
private final Clock clock;
+ private final Object replyTrackingMonitor = new Object();
private StateDescription state;
private long visitorCounter = 0;
private long startTimeNanos = 0;
+ private long scheduledHandleReplyTasks = 0; // Must be protected by replyTrackingMonitor
private boolean scheduledSendCreateVisitors = false;
private boolean done = false;
private boolean destroying = false; // For testing and sanity checking
@@ -454,14 +456,40 @@ public class MessageBusVisitorSession implements VisitorSession {
return state;
}
+ private boolean hasScheduledHandleReplyTask() {
+ // This is synchronized instead of an AtomicLong simply because it makes it considerably
+ // easier to reason about happens-before relationships, memory visibility and sequencing
+ // of events across threads when an actual critical section is involved.
+ synchronized (replyTrackingMonitor) {
+ return scheduledHandleReplyTasks != 0;
+ }
+ }
+
+ private void incrementScheduledHandleReplyTasks() {
+ synchronized (replyTrackingMonitor) {
+ ++scheduledHandleReplyTasks;
+ }
+ }
+
+ private void decrementScheduleHandleReplyTasks() {
+ synchronized (replyTrackingMonitor) {
+ assert(scheduledHandleReplyTasks > 0);
+ --scheduledHandleReplyTasks;
+ }
+ }
+
private ReplyHandler createReplyHandler() {
return (reply) -> {
// Generally, handleReply will run in the context of the
// underlying transport layer's processing thread(s), so we
// schedule our own reply handling task to avoid blocking it.
try {
+ // Make concurrent reply handling visible in sender thread, if it's active.
+ // See SendCreateVisitorsTask.run() for a rationale.
+ incrementScheduledHandleReplyTasks();
taskExecutor.submitTask(new HandleReplyTask(reply));
} catch (RejectedExecutionException e) {
+ decrementScheduleHandleReplyTasks();
// We cannot reliably handle reply tasks failing to be submitted, since
// the reply task performs all our internal state handling logic. As such,
// we just immediately go into a failure destruction mode as soon as this
@@ -648,7 +676,39 @@ public class MessageBusVisitorSession implements VisitorSession {
if (done) {
return; // Session already closed; we must not touch anything else.
}
- while (progress.getIterator().hasNext()) {
+ // We both send requests and process replies in the context of a dedicated task executor pool.
+ // However, MessageBus sending and reply receiving happens in the context of entirely
+ // separate threads. If the backend responds very quickly to visitor requests (such as
+ // if buckets are empty), this can leave us in the following awkward position:
+ //
+ // 1. Replies arrive from backend, open up the throttle window, reply handling
+ // task gets pushed onto executor queue (but not yet executed).
+ // 2. Send loop below continuously get a free send slot, keeps sending visitors
+ // and filling up the set of pending buckets in the progress token.
+ // 3. Since visitor session is busy-looping in the send task, reply processing is
+ // consequently entirely starved until the MessageBus throttle window is bursting
+ // at the seams. This can effectively nullify the effects of the throttling policy,
+ // especially if it's dynamic. But a static throttle policy with a sufficiently
+ // high max window size will also potentially cause a runaway visitor train since
+ // the active window size keeps getting decreased by backend replies.
+ //
+ // To get around this, we explicitly check for concurrently scheduled message handling
+ // tasks from the transport layer, breaking the loop if at least one handler has been
+ // scheduled. This also has the (positive) effect of draining all reply tasks before we
+ // start sending more work downstream.
+ //
+ // Since visitor session progress is edge-triggered and progresses exclusively by sending
+ // new visitors in reply handling tasks, it's critical that we never end up in a situation
+ // where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively
+ // hanging the session. We must therefore be very careful that we only exit the send loop
+ // if we _know_ we have at least one pending task enqueued that will ensure session progress.
+ //
+ // We're holding the session (token) lock around checking the pending reply tasks count, so
+ // if we observe a change we know that a reply task must have been scheduled and that its
+ // processing must take place sequenced after we have exited the loop, as the reply handling
+ // also takes the session (token) lock. I.e. it should not be possible to end up in a
+ // situation where we stall session progress due to not having any further event edges.
+ while (progress.getIterator().hasNext() && !hasScheduledHandleReplyTask()) {
VisitorIterator.BucketProgress bucket = progress.getIterator().getNext();
Result result = sender.send(createMessage(bucket));
if (result.isAccepted()) {
@@ -710,7 +770,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private class HandleReplyTask implements Runnable {
- private Reply reply;
+ private final Reply reply;
HandleReplyTask(Reply reply) {
this.reply = reply;
}
@@ -718,6 +778,10 @@ public class MessageBusVisitorSession implements VisitorSession {
@Override
public void run() {
synchronized (progress.getToken()) {
+ // Decrement pending replies inside same lock as sender task to ensure that if the sender
+ // observes a non-zero number of reply tasks, it's guaranteed that this actually means a
+ // task _will_ be run later at some point.
+ decrementScheduleHandleReplyTasks();
try {
assert(pendingMessageCount > 0);
--pendingMessageCount;
@@ -748,7 +812,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private class HandleMessageTask implements Runnable {
- private Message message;
+ private final Message message;
private HandleMessageTask(Message message) {
this.message = message;