diff options
Diffstat (limited to 'documentapi')
2 files changed, 209 insertions, 61 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 42d6d2e66a3..273d5d89ac2 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -73,6 +73,10 @@ public class MessageBusVisitorSession implements VisitorSession { void scheduleTask(Runnable event, long delay, TimeUnit unit); } + public interface Clock { + long monotonicNanoTime(); + } + public static class VisitingProgress { private final VisitorIterator iterator; private final ProgressToken token; @@ -268,6 +272,13 @@ public class MessageBusVisitorSession implements VisitorSession { } } + public static class RealClock implements Clock { + @Override + public long monotonicNanoTime() { + return System.nanoTime(); + } + } + private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName()); private static long sessionCounter = 0; @@ -288,8 +299,10 @@ public class MessageBusVisitorSession implements VisitorSession { private final VisitorStatistics statistics; private final String sessionName = createSessionName(); private final String dataDestination; + private final Clock clock; private StateDescription state; private long visitorCounter = 0; + private long startTimeNanos = 0; private boolean scheduledSendCreateVisitors = false; private boolean done = false; private boolean destroying = false; // For testing and sanity checking @@ -309,6 +322,19 @@ public class MessageBusVisitorSession implements VisitorSession { RoutingTable routingTable) throws ParseException { + this(visitorParameters, taskExecutor, senderFactory, + receiverFactory, routingTable, new RealClock()); + } + + // TODO builder pattern + public MessageBusVisitorSession(VisitorParameters visitorParameters, + AsyncTaskExecutor taskExecutor, + SenderFactory senderFactory, + ReceiverFactory receiverFactory, + RoutingTable routingTable, + Clock clock) + throws ParseException + { this.params = visitorParameters; // TODO(vekterli): make copy? legacy impl does not copy initializeRoute(routingTable); this.sender = senderFactory.createSender(createReplyHandler(), this.params); @@ -317,6 +343,7 @@ public class MessageBusVisitorSession implements VisitorSession { this.progress = createVisitingProgress(params); this.statistics = new VisitorStatistics(); this.state = new StateDescription(State.NOT_STARTED); + this.clock = clock; initializeHandlers(); trace = new Trace(visitorParameters.getTraceLevel()); dataDestination = (params.getLocalDataHandler() == null @@ -339,13 +366,14 @@ public class MessageBusVisitorSession implements VisitorSession { public void start() { synchronized (progress.getToken()) { + this.startTimeNanos = clock.monotonicNanoTime(); if (progress.getIterator().isDone()) { log.log(LogLevel.DEBUG, sessionName + ": progress token indicates " + "session is done before it could even start; no-op"); return; } transitionTo(new StateDescription(State.WORKING)); - taskExecutor.submitTask(new SendCreateVisitorsTask()); + taskExecutor.submitTask(new SendCreateVisitorsTask(sessionTimeoutMs())); } } @@ -367,15 +395,19 @@ public class MessageBusVisitorSession implements VisitorSession { assert(state.getState() == State.NOT_STARTED); state = newState; } else if (newState.getState() == State.COMPLETED) { - if (state.getState() != State.ABORTED && state.getState() != State.FAILED) { + if (!state.failed()) { state = newState; - } // else: don't override aborted state + } // else: don't override existing failure state } else if (newState.getState() == State.ABORTED) { state = newState; } else if (newState.getState() == State.FAILED) { if (state.getState() != State.FAILED) { state = newState; - } // else: don't override failed state + } // else: don't override existing failure state + } else if (newState.getState() == State.TIMED_OUT) { + if (!state.failed()) { + state = newState; + } // else: don't override existing failure state } else { assert(false); } @@ -384,29 +416,26 @@ public class MessageBusVisitorSession implements VisitorSession { } private ReplyHandler createReplyHandler() { - return new ReplyHandler() { - @Override - public void handleReply(Reply reply) { - // Generally, handleReply will run in the context of the - // underlying transport layer's processing thread(s), so we - // schedule our own reply handling task to avoid blocking it. - try { - taskExecutor.submitTask(new HandleReplyTask(reply)); - } catch (RejectedExecutionException e) { - // We cannot reliably handle reply tasks failing to be submitted, since - // the reply task performs all our internal state handling logic. As such, - // we just immediately go into a failure destruction mode as soon as this - // happens, in which we do not wait for any active messages to be replied - // to. - log.log(LogLevel.WARNING, "Visitor session '" + sessionName + - "': failed to submit reply task to executor service! " + - "Session cannot reliably continue; terminating it early.", e); - - synchronized (progress.getToken()) { - transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage())); - if (!done) { - markSessionCompleted(); - } + return (reply) -> { + // Generally, handleReply will run in the context of the + // underlying transport layer's processing thread(s), so we + // schedule our own reply handling task to avoid blocking it. + try { + taskExecutor.submitTask(new HandleReplyTask(reply)); + } catch (RejectedExecutionException e) { + // We cannot reliably handle reply tasks failing to be submitted, since + // the reply task performs all our internal state handling logic. As such, + // we just immediately go into a failure destruction mode as soon as this + // happens, in which we do not wait for any active messages to be replied + // to. + log.log(LogLevel.WARNING, "Visitor session '" + sessionName + + "': failed to submit reply task to executor service! " + + "Session cannot reliably continue; terminating it early.", e); + + synchronized (progress.getToken()) { + transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage())); + if (!done) { + markSessionCompleted(); } } } @@ -414,19 +443,16 @@ public class MessageBusVisitorSession implements VisitorSession { } private MessageHandler createMessageHandler() { - return new MessageHandler() { - @Override - public void handleMessage(Message message) { - try { - taskExecutor.submitTask(new HandleMessageTask(message)); - } catch (RejectedExecutionException e) { - Reply reply = ((DocumentMessage)message).createReply(); - message.swapState(reply); - reply.addError(new Error( - DocumentProtocol.ERROR_ABORTED, - "Visitor session has been aborted")); - receiver.reply(reply); - } + return (message) -> { + try { + taskExecutor.submitTask(new HandleMessageTask(message)); + } catch (RejectedExecutionException e) { + Reply reply = ((DocumentMessage)message).createReply(); + message.swapState(reply); + reply.addError(new Error( + DocumentProtocol.ERROR_ABORTED, + "Visitor session has been aborted")); + receiver.reply(reply); } }; } @@ -530,6 +556,12 @@ public class MessageBusVisitorSession implements VisitorSession { // All private methods in this task must be protected by a lock around // the progress token! + private final long messageTimeoutMs; + + public SendCreateVisitorsTask(long messageTimeoutMs) { + this.messageTimeoutMs = messageTimeoutMs; + } + private String getNextVisitorId() { StringBuilder sb = new StringBuilder(); ++visitorCounter; @@ -545,9 +577,7 @@ public class MessageBusVisitorSession implements VisitorSession { dataDestination); msg.getTrace().setLevel(params.getTraceLevel()); - msg.setTimeRemaining(params.getTimeoutMs() != -1 - ? params.getTimeoutMs() - : 5 * 60 * 1000); + msg.setTimeRemaining(messageTimeoutMs); msg.setBuckets(Arrays.asList(bucket.getSuperbucket(), bucket.getProgress())); msg.setDocumentSelection(params.getDocumentSelection()); msg.setFromTimestamp(params.getFromTimestamp()); @@ -892,23 +922,44 @@ public class MessageBusVisitorSession implements VisitorSession { || enoughHitsReceived()); } + private long sessionTimeoutMs() { + return (params.getTimeoutMs() != -1) ? params.getTimeoutMs() : 5 * 60 * 1000; + } + + private long elapsedTimeMs() { + return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos); + } + /** * Schedule a new SendCreateVisitors task iff there are still buckets to * visit, the visiting has not failed fatally and we haven't already * scheduled such a task. */ private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) { - if (scheduledSendCreateVisitors - || !progress.getIterator().hasNext() - || state.failed() - || enoughHitsReceived()) - { + final long elapsedMs = elapsedTimeMs(); + final long timeoutMs = sessionTimeoutMs(); + if (elapsedMs >= timeoutMs) { + transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", timeoutMs))); + if (visitingCompleted()) { + markSessionCompleted(); + } return; } - taskExecutor.scheduleTask(new SendCreateVisitorsTask(), delay, unit); + if (!mayScheduleCreateVisitorsTask()) { + return; + } + final long messageTimeoutMs = timeoutMs - elapsedMs; + taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMs), delay, unit); scheduledSendCreateVisitors = true; } + private boolean mayScheduleCreateVisitorsTask() { + return ! (scheduledSendCreateVisitors + || !progress.getIterator().hasNext() + || state.failed() + || enoughHitsReceived()); + } + private void scheduleSendCreateVisitorsIfApplicable() { scheduleSendCreateVisitorsIfApplicable(0, TimeUnit.MILLISECONDS); } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java index 6df4c75695c..a9c8ee88082 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +// TODO replace explicit pre-mockito mock classes with proper mockito mocks wherever possible public class MessageBusVisitorSessionTestCase { private class MockSender implements MessageBusVisitorSession.Sender { private int maxPending = 1000; @@ -393,11 +394,23 @@ public class MessageBusVisitorSessionTestCase { } } + private class MockClock implements MessageBusVisitorSession.Clock { + private long monotonicTime = 0; + + @Override + public long monotonicNanoTime() { return monotonicTime; } + + public void setMonotonicTime(long monotonicTime, TimeUnit unit) { + this.monotonicTime = unit.toNanos(monotonicTime); + } + } + private MessageBusVisitorSession createVisitorSession(MockSender sender, - MockReceiver receiver, - MockAsyncTaskExecutor executor, - VisitorParameters visitorParameters, - RoutingTable routingTable) + MockReceiver receiver, + MockAsyncTaskExecutor executor, + VisitorParameters visitorParameters, + RoutingTable routingTable, + MockClock clock) { if (routingTable == null) { routingTable = new RoutingTable(new RoutingTableSpec(DocumentProtocol.NAME)); @@ -408,18 +421,19 @@ public class MessageBusVisitorSessionTestCase { executor, new MockSenderFactory(sender), new MockReceiverFactory(receiver), - routingTable); + routingTable, + clock); } catch (ParseException e) { throw new IllegalArgumentException("Bad document selection", e); } } private MessageBusVisitorSession createVisitorSession(MockSender sender, - MockReceiver receiver, - MockAsyncTaskExecutor executor, - VisitorParameters visitorParameters) + MockReceiver receiver, + MockAsyncTaskExecutor executor, + VisitorParameters visitorParameters) { - return createVisitorSession(sender, receiver, executor, visitorParameters, null); + return createVisitorSession(sender, receiver, executor, visitorParameters, null, new MockClock()); } VisitorParameters createVisitorParameters(String selection) { @@ -551,6 +565,7 @@ public class MessageBusVisitorSessionTestCase { public MockControlHandler controlHandler; public MockDataHandler dataHandler; public MessageBusVisitorSession visitorSession; + public MockClock clock; public MockComponents(VisitorParameters visitorParameters) { this(visitorParameters, null); @@ -563,9 +578,10 @@ public class MessageBusVisitorSessionTestCase { params = visitorParameters; controlHandler = new MockControlHandler(); dataHandler = new MockDataHandler(); + clock = new MockClock(); params.setControlHandler(controlHandler); params.setLocalDataHandler(dataHandler); - visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable); + visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable, clock); } public MockComponents() { @@ -584,7 +600,8 @@ public class MessageBusVisitorSessionTestCase { params = builder.params; controlHandler = builder.controlHandler; dataHandler = builder.dataHandler; - visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable); + clock = builder.clock; + visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable, clock); } } @@ -596,6 +613,7 @@ public class MessageBusVisitorSessionTestCase { public MockControlHandler controlHandler = new MockControlHandler(); public MockDataHandler dataHandler = new MockDataHandler(); public RoutingTable routingTable = null; + public MockClock clock = new MockClock(); public MockComponents createMockComponents() { return new MockComponents(this); @@ -2430,6 +2448,86 @@ public class MessageBusVisitorSessionTestCase { mc.receiver.repliesToString()); } + private MockComponents createTimeoutMocksAtInitialTime(long timeoutMs, long currentTimeMs, int maxPending) { + MockComponentsBuilder builder = new MockComponentsBuilder(); + builder.params.setTimeoutMs(timeoutMs); + builder.params.setControlHandler(builder.controlHandler); + MockComponents mc = builder.createMockComponents(); + mc.sender.setMaxPending(maxPending); + mc.clock.setMonotonicTime(currentTimeMs, TimeUnit.MILLISECONDS); // Baseline time + + mc.visitorSession.start(); + mc.controlHandler.resetMock(); // clear messages + mc.executor.expectAndProcessTasks(1); + return mc; + } + + @Test + public void visitor_command_timeout_set_to_remaining_session_timeout() { + MockComponents mc = createTimeoutMocksAtInitialTime(10_000, 10_000, 1); + + // Superbucket 1 of 2 + assertEquals("CreateVisitorMessage(buckets=[\n" + + "BucketId(0x0400000000000000)\n" + + "BucketId(0x0000000000000000)\n" + + "]\n" + + "time remaining=10000\n)", + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); + + mc.clock.setMonotonicTime(14, TimeUnit.SECONDS); // 4 seconds elapsed from baseline + mc.executor.expectAndProcessTasks(1); // reply + mc.executor.expectAndProcessTasks(1); // send create visitors + // Superbucket 2 of 2 + assertEquals("CreateVisitorMessage(buckets=[\n" + + "BucketId(0x0400000000000001)\n" + + "BucketId(0x0000000000000000)\n" + + "]\n" + + "time remaining=6000\n)", + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); + } + + @Test + public void fail_session_with_timeout_if_timeout_has_elapsed() { + MockComponents mc = createTimeoutMocksAtInitialTime(4_000, 20_000, 1); + + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2 + mc.clock.setMonotonicTime(24_000, TimeUnit.MILLISECONDS); // 4 second timeout expired + + // Reply task processing shall discover that timeout has expired + mc.executor.expectAndProcessTasks(1); + mc.executor.expectNoTasks(); // No further send tasks enqueued + assertTrue(mc.controlHandler.isDone()); + assertEquals("onProgress : 0 active, 1 pending, 1 finished, 2 total\n" + + "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" + + "onDone : TIMEOUT - 'Session timeout of 4000 ms expired'\n", + mc.controlHandler.toString()); + } + + @Test + public void timeout_with_pending_messages_does_not_close_session_until_all_replies_received() { + MockComponents mc = createTimeoutMocksAtInitialTime(5_000, 20_000, 2); + + assertEquals(2, mc.sender.getMessageCount()); + + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2 + mc.clock.setMonotonicTime(25_000, TimeUnit.MILLISECONDS); + + mc.executor.expectAndProcessTasks(1); + mc.executor.expectNoTasks(); // No further send tasks enqueued + assertFalse(mc.controlHandler.isDone()); // Still pending messages, session _not_ yet done. + + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 2 of 2 + mc.controlHandler.resetMock(); + mc.executor.expectAndProcessTasks(1); + mc.executor.expectNoTasks(); // No further send tasks enqueued + assertTrue(mc.controlHandler.isDone()); // Now it's done. + + assertEquals("onProgress : 0 active, 0 pending, 2 finished, 2 total\n" + + "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" + + "onDone : TIMEOUT - 'Session timeout of 5000 ms expired'\n", + mc.controlHandler.toString()); + } + /** * TODOs: * - parameter validation (max pending, ...) @@ -2437,7 +2535,6 @@ public class MessageBusVisitorSessionTestCase { * - [add percent finished to progress file; ticket 5360824] */ - // TODO: what about session-wide timeouts? // TODO: consider refactoring locking granularity // TODO: figure out if we risk a re-run of the "too many tasks" issue } |