diff options
Diffstat (limited to 'documentapi/src/main/java')
-rwxr-xr-x | documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java | 70 |
1 files changed, 67 insertions, 3 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 1c473a196f3..0da9d0ea621 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -327,9 +327,11 @@ public class MessageBusVisitorSession implements VisitorSession { private final String sessionName = createSessionName(); private final String dataDestination; private final Clock clock; + private final Object replyTrackingMonitor = new Object(); private StateDescription state; private long visitorCounter = 0; private long startTimeNanos = 0; + private long scheduledHandleReplyTasks = 0; // Must be protected by replyTrackingMonitor private boolean scheduledSendCreateVisitors = false; private boolean done = false; private boolean destroying = false; // For testing and sanity checking @@ -454,14 +456,40 @@ public class MessageBusVisitorSession implements VisitorSession { return state; } + private boolean hasScheduledHandleReplyTask() { + // This is synchronized instead of an AtomicLong simply because it makes it considerably + // easier to reason about happens-before relationships, memory visibility and sequencing + // of events across threads when an actual critical section is involved. + synchronized (replyTrackingMonitor) { + return scheduledHandleReplyTasks != 0; + } + } + + private void incrementScheduledHandleReplyTasks() { + synchronized (replyTrackingMonitor) { + ++scheduledHandleReplyTasks; + } + } + + private void decrementScheduleHandleReplyTasks() { + synchronized (replyTrackingMonitor) { + assert(scheduledHandleReplyTasks > 0); + --scheduledHandleReplyTasks; + } + } + private ReplyHandler createReplyHandler() { return (reply) -> { // Generally, handleReply will run in the context of the // underlying transport layer's processing thread(s), so we // schedule our own reply handling task to avoid blocking it. try { + // Make concurrent reply handling visible in sender thread, if it's active. + // See SendCreateVisitorsTask.run() for a rationale. + incrementScheduledHandleReplyTasks(); taskExecutor.submitTask(new HandleReplyTask(reply)); } catch (RejectedExecutionException e) { + decrementScheduleHandleReplyTasks(); // We cannot reliably handle reply tasks failing to be submitted, since // the reply task performs all our internal state handling logic. As such, // we just immediately go into a failure destruction mode as soon as this @@ -648,7 +676,39 @@ public class MessageBusVisitorSession implements VisitorSession { if (done) { return; // Session already closed; we must not touch anything else. } - while (progress.getIterator().hasNext()) { + // We both send requests and process replies in the context of a dedicated task executor pool. + // However, MessageBus sending and reply receiving happens in the context of entirely + // separate threads. If the backend responds very quickly to visitor requests (such as + // if buckets are empty), this can leave us in the following awkward position: + // + // 1. Replies arrive from backend, open up the throttle window, reply handling + // task gets pushed onto executor queue (but not yet executed). + // 2. Send loop below continuously get a free send slot, keeps sending visitors + // and filling up the set of pending buckets in the progress token. + // 3. Since visitor session is busy-looping in the send task, reply processing is + // consequently entirely starved until the MessageBus throttle window is bursting + // at the seams. This can effectively nullify the effects of the throttling policy, + // especially if it's dynamic. But a static throttle policy with a sufficiently + // high max window size will also potentially cause a runaway visitor train since + // the active window size keeps getting decreased by backend replies. + // + // To get around this, we explicitly check for concurrently scheduled message handling + // tasks from the transport layer, breaking the loop if at least one handler has been + // scheduled. This also has the (positive) effect of draining all reply tasks before we + // start sending more work downstream. + // + // Since visitor session progress is edge-triggered and progresses exclusively by sending + // new visitors in reply handling tasks, it's critical that we never end up in a situation + // where we have no pending CreateVisitors (or scheduled tasks), or we risk effectively + // hanging the session. We must therefore be very careful that we only exit the send loop + // if we _know_ we have at least one pending task enqueued that will ensure session progress. + // + // We're holding the session (token) lock around checking the pending reply tasks count, so + // if we observe a change we know that a reply task must have been scheduled and that its + // processing must take place sequenced after we have exited the loop, as the reply handling + // also takes the session (token) lock. I.e. it should not be possible to end up in a + // situation where we stall session progress due to not having any further event edges. + while (progress.getIterator().hasNext() && !hasScheduledHandleReplyTask()) { VisitorIterator.BucketProgress bucket = progress.getIterator().getNext(); Result result = sender.send(createMessage(bucket)); if (result.isAccepted()) { @@ -710,7 +770,7 @@ public class MessageBusVisitorSession implements VisitorSession { } private class HandleReplyTask implements Runnable { - private Reply reply; + private final Reply reply; HandleReplyTask(Reply reply) { this.reply = reply; } @@ -718,6 +778,10 @@ public class MessageBusVisitorSession implements VisitorSession { @Override public void run() { synchronized (progress.getToken()) { + // Decrement pending replies inside same lock as sender task to ensure that if the sender + // observes a non-zero number of reply tasks, it's guaranteed that this actually means a + // task _will_ be run later at some point. + decrementScheduleHandleReplyTasks(); try { assert(pendingMessageCount > 0); --pendingMessageCount; @@ -748,7 +812,7 @@ public class MessageBusVisitorSession implements VisitorSession { } private class HandleMessageTask implements Runnable { - private Message message; + private final Message message; private HandleMessageTask(Message message) { this.message = message; |