summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java')
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java70
1 files changed, 67 insertions, 3 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 1c473a196f3..0da9d0ea621 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -327,9 +327,11 @@ public class MessageBusVisitorSession implements VisitorSession {
private final String sessionName = createSessionName();
private final String dataDestination;
private final Clock clock;
+ private final Object replyTrackingMonitor = new Object();
private StateDescription state;
private long visitorCounter = 0;
private long startTimeNanos = 0;
+ private long scheduledHandleReplyTasks = 0; // Must be protected by replyTrackingMonitor
private boolean scheduledSendCreateVisitors = false;
private boolean done = false;
private boolean destroying = false; // For testing and sanity checking
@@ -454,14 +456,40 @@ public class MessageBusVisitorSession implements VisitorSession {
return state;
}
+ private boolean hasScheduledHandleReplyTask() {
+ // This is synchronized instead of an AtomicLong simply because it makes it considerably
+ // easier to reason about happens-before relationships, memory visibility and sequencing
+ // of events across threads when an actual critical section is involved.
+ synchronized (replyTrackingMonitor) {
+ return scheduledHandleReplyTasks != 0;
+ }
+ }
+
+ private void incrementScheduledHandleReplyTasks() {
+ synchronized (replyTrackingMonitor) {
+ ++scheduledHandleReplyTasks;
+ }
+ }
+
+ private void decrementScheduleHandleReplyTasks() {
+ synchronized (replyTrackingMonitor) {
+ assert(scheduledHandleReplyTasks > 0);
+ --scheduledHandleReplyTasks;
+ }
+ }
+
private ReplyHandler createReplyHandler() {
return (reply) -> {
// Generally, handleReply will run in the context of the
// underlying transport layer's processing thread(s), so we
// schedule our own reply handling task to avoid blocking it.
try {
+ // Make concurrent reply handling visible in sender thread, if it's active.
+ // See SendCreateVisitorsTask.run() for a rationale.
+ incrementScheduledHandleReplyTasks();
taskExecutor.submitTask(new HandleReplyTask(reply));
} catch (RejectedExecutionException e) {
+ decrementScheduleHandleReplyTasks();
// We cannot reliably handle reply tasks failing to be submitted, since
// the reply task performs all our internal state handling logic. As such,
// we just immediately go into a failure destruction mode as soon as this
@@ -648,7 +676,39 @@ public class MessageBusVisitorSession implements VisitorSession {
if (done) {
return; // Session already closed; we must not touch anything else.
}
- while (progress.getIterator().hasNext()) {
+ // We both send requests and process replies in the context of a dedicated task executor pool.
+ // However, MessageBus sending and reply receiving happens in the context of entirely
+ // separate threads. If the backend responds very quickly to visitor requests (such as
+ // if buckets are empty), this can leave us in the following awkward position:
+ //
+ // 1. Replies arrive from backend, open up the throttle window, reply handling
+ // task gets pushed onto executor queue (but not yet executed).
+ // 2. Send loop below continuously get a free send slot, keeps sending visitors
+ // and filling up the set of pending buckets in the progress token.
+ // 3. Since visitor session is busy-looping in the send task, reply processing is
+ // consequently entirely starved until the MessageBus throttle window is bursting
+ // at the seams. This can effectively nullify the effects of the throttling policy,
+ // especially if it's dynamic. But a static throttle policy with a sufficiently
+ // high max window size will also potentially cause a runaway visitor train since
+ // the active window size keeps getting decreased by backend replies.
+ //
+ // To get around this, we explicitly check for concurrently scheduled message handling
+ // tasks from the transport layer, breaking the loop if at least one handler has been
+ // scheduled. This also has the (positive) effect of draining all reply tasks before we
+ // start sending more work downstream.
+ //
+ // Since visitor session progress is edge-triggered and progresses exclusively by sending
+ // new visitors in reply handling tasks, it's critical that we never end up in a situation
+ // where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively
+ // hanging the session. We must therefore be very careful that we only exit the send loop
+ // if we _know_ we have at least one pending task enqueued that will ensure session progress.
+ //
+ // We're holding the session (token) lock around checking the pending reply tasks count, so
+ // if we observe a change we know that a reply task must have been scheduled and that its
+ // processing must take place sequenced after we have exited the loop, as the reply handling
+ // also takes the session (token) lock. I.e. it should not be possible to end up in a
+ // situation where we stall session progress due to not having any further event edges.
+ while (progress.getIterator().hasNext() && !hasScheduledHandleReplyTask()) {
VisitorIterator.BucketProgress bucket = progress.getIterator().getNext();
Result result = sender.send(createMessage(bucket));
if (result.isAccepted()) {
@@ -710,7 +770,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private class HandleReplyTask implements Runnable {
- private Reply reply;
+ private final Reply reply;
HandleReplyTask(Reply reply) {
this.reply = reply;
}
@@ -718,6 +778,10 @@ public class MessageBusVisitorSession implements VisitorSession {
@Override
public void run() {
synchronized (progress.getToken()) {
+ // Decrement pending replies inside same lock as sender task to ensure that if the sender
+ // observes a non-zero number of reply tasks, it's guaranteed that this actually means a
+ // task _will_ be run later at some point.
+ decrementScheduleHandleReplyTasks();
try {
assert(pendingMessageCount > 0);
--pendingMessageCount;
@@ -748,7 +812,7 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private class HandleMessageTask implements Runnable {
- private Message message;
+ private final Message message;
private HandleMessageTask(Message message) {
this.message = message;