aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java149
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java121
2 files changed, 209 insertions, 61 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 42d6d2e66a3..273d5d89ac2 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -73,6 +73,10 @@ public class MessageBusVisitorSession implements VisitorSession {
void scheduleTask(Runnable event, long delay, TimeUnit unit);
}
+ public interface Clock {
+ long monotonicNanoTime();
+ }
+
public static class VisitingProgress {
private final VisitorIterator iterator;
private final ProgressToken token;
@@ -268,6 +272,13 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
+ public static class RealClock implements Clock {
+ @Override
+ public long monotonicNanoTime() {
+ return System.nanoTime();
+ }
+ }
+
private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName());
private static long sessionCounter = 0;
@@ -288,8 +299,10 @@ public class MessageBusVisitorSession implements VisitorSession {
private final VisitorStatistics statistics;
private final String sessionName = createSessionName();
private final String dataDestination;
+ private final Clock clock;
private StateDescription state;
private long visitorCounter = 0;
+ private long startTimeNanos = 0;
private boolean scheduledSendCreateVisitors = false;
private boolean done = false;
private boolean destroying = false; // For testing and sanity checking
@@ -309,6 +322,19 @@ public class MessageBusVisitorSession implements VisitorSession {
RoutingTable routingTable)
throws ParseException
{
+ this(visitorParameters, taskExecutor, senderFactory,
+ receiverFactory, routingTable, new RealClock());
+ }
+
+ // TODO builder pattern
+ public MessageBusVisitorSession(VisitorParameters visitorParameters,
+ AsyncTaskExecutor taskExecutor,
+ SenderFactory senderFactory,
+ ReceiverFactory receiverFactory,
+ RoutingTable routingTable,
+ Clock clock)
+ throws ParseException
+ {
this.params = visitorParameters; // TODO(vekterli): make copy? legacy impl does not copy
initializeRoute(routingTable);
this.sender = senderFactory.createSender(createReplyHandler(), this.params);
@@ -317,6 +343,7 @@ public class MessageBusVisitorSession implements VisitorSession {
this.progress = createVisitingProgress(params);
this.statistics = new VisitorStatistics();
this.state = new StateDescription(State.NOT_STARTED);
+ this.clock = clock;
initializeHandlers();
trace = new Trace(visitorParameters.getTraceLevel());
dataDestination = (params.getLocalDataHandler() == null
@@ -339,13 +366,14 @@ public class MessageBusVisitorSession implements VisitorSession {
public void start() {
synchronized (progress.getToken()) {
+ this.startTimeNanos = clock.monotonicNanoTime();
if (progress.getIterator().isDone()) {
log.log(LogLevel.DEBUG, sessionName + ": progress token indicates " +
"session is done before it could even start; no-op");
return;
}
transitionTo(new StateDescription(State.WORKING));
- taskExecutor.submitTask(new SendCreateVisitorsTask());
+ taskExecutor.submitTask(new SendCreateVisitorsTask(sessionTimeoutMs()));
}
}
@@ -367,15 +395,19 @@ public class MessageBusVisitorSession implements VisitorSession {
assert(state.getState() == State.NOT_STARTED);
state = newState;
} else if (newState.getState() == State.COMPLETED) {
- if (state.getState() != State.ABORTED && state.getState() != State.FAILED) {
+ if (!state.failed()) {
state = newState;
- } // else: don't override aborted state
+ } // else: don't override existing failure state
} else if (newState.getState() == State.ABORTED) {
state = newState;
} else if (newState.getState() == State.FAILED) {
if (state.getState() != State.FAILED) {
state = newState;
- } // else: don't override failed state
+ } // else: don't override existing failure state
+ } else if (newState.getState() == State.TIMED_OUT) {
+ if (!state.failed()) {
+ state = newState;
+ } // else: don't override existing failure state
} else {
assert(false);
}
@@ -384,29 +416,26 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private ReplyHandler createReplyHandler() {
- return new ReplyHandler() {
- @Override
- public void handleReply(Reply reply) {
- // Generally, handleReply will run in the context of the
- // underlying transport layer's processing thread(s), so we
- // schedule our own reply handling task to avoid blocking it.
- try {
- taskExecutor.submitTask(new HandleReplyTask(reply));
- } catch (RejectedExecutionException e) {
- // We cannot reliably handle reply tasks failing to be submitted, since
- // the reply task performs all our internal state handling logic. As such,
- // we just immediately go into a failure destruction mode as soon as this
- // happens, in which we do not wait for any active messages to be replied
- // to.
- log.log(LogLevel.WARNING, "Visitor session '" + sessionName +
- "': failed to submit reply task to executor service! " +
- "Session cannot reliably continue; terminating it early.", e);
-
- synchronized (progress.getToken()) {
- transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
- if (!done) {
- markSessionCompleted();
- }
+ return (reply) -> {
+ // Generally, handleReply will run in the context of the
+ // underlying transport layer's processing thread(s), so we
+ // schedule our own reply handling task to avoid blocking it.
+ try {
+ taskExecutor.submitTask(new HandleReplyTask(reply));
+ } catch (RejectedExecutionException e) {
+ // We cannot reliably handle reply tasks failing to be submitted, since
+ // the reply task performs all our internal state handling logic. As such,
+ // we just immediately go into a failure destruction mode as soon as this
+ // happens, in which we do not wait for any active messages to be replied
+ // to.
+ log.log(LogLevel.WARNING, "Visitor session '" + sessionName +
+ "': failed to submit reply task to executor service! " +
+ "Session cannot reliably continue; terminating it early.", e);
+
+ synchronized (progress.getToken()) {
+ transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
+ if (!done) {
+ markSessionCompleted();
}
}
}
@@ -414,19 +443,16 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private MessageHandler createMessageHandler() {
- return new MessageHandler() {
- @Override
- public void handleMessage(Message message) {
- try {
- taskExecutor.submitTask(new HandleMessageTask(message));
- } catch (RejectedExecutionException e) {
- Reply reply = ((DocumentMessage)message).createReply();
- message.swapState(reply);
- reply.addError(new Error(
- DocumentProtocol.ERROR_ABORTED,
- "Visitor session has been aborted"));
- receiver.reply(reply);
- }
+ return (message) -> {
+ try {
+ taskExecutor.submitTask(new HandleMessageTask(message));
+ } catch (RejectedExecutionException e) {
+ Reply reply = ((DocumentMessage)message).createReply();
+ message.swapState(reply);
+ reply.addError(new Error(
+ DocumentProtocol.ERROR_ABORTED,
+ "Visitor session has been aborted"));
+ receiver.reply(reply);
}
};
}
@@ -530,6 +556,12 @@ public class MessageBusVisitorSession implements VisitorSession {
// All private methods in this task must be protected by a lock around
// the progress token!
+ private final long messageTimeoutMs;
+
+ public SendCreateVisitorsTask(long messageTimeoutMs) {
+ this.messageTimeoutMs = messageTimeoutMs;
+ }
+
private String getNextVisitorId() {
StringBuilder sb = new StringBuilder();
++visitorCounter;
@@ -545,9 +577,7 @@ public class MessageBusVisitorSession implements VisitorSession {
dataDestination);
msg.getTrace().setLevel(params.getTraceLevel());
- msg.setTimeRemaining(params.getTimeoutMs() != -1
- ? params.getTimeoutMs()
- : 5 * 60 * 1000);
+ msg.setTimeRemaining(messageTimeoutMs);
msg.setBuckets(Arrays.asList(bucket.getSuperbucket(), bucket.getProgress()));
msg.setDocumentSelection(params.getDocumentSelection());
msg.setFromTimestamp(params.getFromTimestamp());
@@ -892,23 +922,44 @@ public class MessageBusVisitorSession implements VisitorSession {
|| enoughHitsReceived());
}
+ private long sessionTimeoutMs() {
+ return (params.getTimeoutMs() != -1) ? params.getTimeoutMs() : 5 * 60 * 1000;
+ }
+
+ private long elapsedTimeMs() {
+ return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos);
+ }
+
/**
* Schedule a new SendCreateVisitors task iff there are still buckets to
* visit, the visiting has not failed fatally and we haven't already
* scheduled such a task.
*/
private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) {
- if (scheduledSendCreateVisitors
- || !progress.getIterator().hasNext()
- || state.failed()
- || enoughHitsReceived())
- {
+ final long elapsedMs = elapsedTimeMs();
+ final long timeoutMs = sessionTimeoutMs();
+ if (elapsedMs >= timeoutMs) {
+ transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", timeoutMs)));
+ if (visitingCompleted()) {
+ markSessionCompleted();
+ }
return;
}
- taskExecutor.scheduleTask(new SendCreateVisitorsTask(), delay, unit);
+ if (!mayScheduleCreateVisitorsTask()) {
+ return;
+ }
+ final long messageTimeoutMs = timeoutMs - elapsedMs;
+ taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMs), delay, unit);
scheduledSendCreateVisitors = true;
}
+ private boolean mayScheduleCreateVisitorsTask() {
+ return ! (scheduledSendCreateVisitors
+ || !progress.getIterator().hasNext()
+ || state.failed()
+ || enoughHitsReceived());
+ }
+
private void scheduleSendCreateVisitorsIfApplicable() {
scheduleSendCreateVisitorsIfApplicable(0, TimeUnit.MILLISECONDS);
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
index 6df4c75695c..a9c8ee88082 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+// TODO replace explicit pre-mockito mock classes with proper mockito mocks wherever possible
public class MessageBusVisitorSessionTestCase {
private class MockSender implements MessageBusVisitorSession.Sender {
private int maxPending = 1000;
@@ -393,11 +394,23 @@ public class MessageBusVisitorSessionTestCase {
}
}
+ private class MockClock implements MessageBusVisitorSession.Clock {
+ private long monotonicTime = 0;
+
+ @Override
+ public long monotonicNanoTime() { return monotonicTime; }
+
+ public void setMonotonicTime(long monotonicTime, TimeUnit unit) {
+ this.monotonicTime = unit.toNanos(monotonicTime);
+ }
+ }
+
private MessageBusVisitorSession createVisitorSession(MockSender sender,
- MockReceiver receiver,
- MockAsyncTaskExecutor executor,
- VisitorParameters visitorParameters,
- RoutingTable routingTable)
+ MockReceiver receiver,
+ MockAsyncTaskExecutor executor,
+ VisitorParameters visitorParameters,
+ RoutingTable routingTable,
+ MockClock clock)
{
if (routingTable == null) {
routingTable = new RoutingTable(new RoutingTableSpec(DocumentProtocol.NAME));
@@ -408,18 +421,19 @@ public class MessageBusVisitorSessionTestCase {
executor,
new MockSenderFactory(sender),
new MockReceiverFactory(receiver),
- routingTable);
+ routingTable,
+ clock);
} catch (ParseException e) {
throw new IllegalArgumentException("Bad document selection", e);
}
}
private MessageBusVisitorSession createVisitorSession(MockSender sender,
- MockReceiver receiver,
- MockAsyncTaskExecutor executor,
- VisitorParameters visitorParameters)
+ MockReceiver receiver,
+ MockAsyncTaskExecutor executor,
+ VisitorParameters visitorParameters)
{
- return createVisitorSession(sender, receiver, executor, visitorParameters, null);
+ return createVisitorSession(sender, receiver, executor, visitorParameters, null, new MockClock());
}
VisitorParameters createVisitorParameters(String selection) {
@@ -551,6 +565,7 @@ public class MessageBusVisitorSessionTestCase {
public MockControlHandler controlHandler;
public MockDataHandler dataHandler;
public MessageBusVisitorSession visitorSession;
+ public MockClock clock;
public MockComponents(VisitorParameters visitorParameters) {
this(visitorParameters, null);
@@ -563,9 +578,10 @@ public class MessageBusVisitorSessionTestCase {
params = visitorParameters;
controlHandler = new MockControlHandler();
dataHandler = new MockDataHandler();
+ clock = new MockClock();
params.setControlHandler(controlHandler);
params.setLocalDataHandler(dataHandler);
- visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable);
+ visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable, clock);
}
public MockComponents() {
@@ -584,7 +600,8 @@ public class MessageBusVisitorSessionTestCase {
params = builder.params;
controlHandler = builder.controlHandler;
dataHandler = builder.dataHandler;
- visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable);
+ clock = builder.clock;
+ visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable, clock);
}
}
@@ -596,6 +613,7 @@ public class MessageBusVisitorSessionTestCase {
public MockControlHandler controlHandler = new MockControlHandler();
public MockDataHandler dataHandler = new MockDataHandler();
public RoutingTable routingTable = null;
+ public MockClock clock = new MockClock();
public MockComponents createMockComponents() {
return new MockComponents(this);
@@ -2430,6 +2448,86 @@ public class MessageBusVisitorSessionTestCase {
mc.receiver.repliesToString());
}
+ private MockComponents createTimeoutMocksAtInitialTime(long timeoutMs, long currentTimeMs, int maxPending) {
+ MockComponentsBuilder builder = new MockComponentsBuilder();
+ builder.params.setTimeoutMs(timeoutMs);
+ builder.params.setControlHandler(builder.controlHandler);
+ MockComponents mc = builder.createMockComponents();
+ mc.sender.setMaxPending(maxPending);
+ mc.clock.setMonotonicTime(currentTimeMs, TimeUnit.MILLISECONDS); // Baseline time
+
+ mc.visitorSession.start();
+ mc.controlHandler.resetMock(); // clear messages
+ mc.executor.expectAndProcessTasks(1);
+ return mc;
+ }
+
+ @Test
+ public void visitor_command_timeout_set_to_remaining_session_timeout() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(10_000, 10_000, 1);
+
+ // Superbucket 1 of 2
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000000)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
+ "time remaining=10000\n)",
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+
+ mc.clock.setMonotonicTime(14, TimeUnit.SECONDS); // 4 seconds elapsed from baseline
+ mc.executor.expectAndProcessTasks(1); // reply
+ mc.executor.expectAndProcessTasks(1); // send create visitors
+ // Superbucket 2 of 2
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000001)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
+ "time remaining=6000\n)",
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+ }
+
+ @Test
+ public void fail_session_with_timeout_if_timeout_has_elapsed() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(4_000, 20_000, 1);
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2
+ mc.clock.setMonotonicTime(24_000, TimeUnit.MILLISECONDS); // 4 second timeout expired
+
+ // Reply task processing shall discover that timeout has expired
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertTrue(mc.controlHandler.isDone());
+ assertEquals("onProgress : 0 active, 1 pending, 1 finished, 2 total\n" +
+ "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" +
+ "onDone : TIMEOUT - 'Session timeout of 4000 ms expired'\n",
+ mc.controlHandler.toString());
+ }
+
+ @Test
+ public void timeout_with_pending_messages_does_not_close_session_until_all_replies_received() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(5_000, 20_000, 2);
+
+ assertEquals(2, mc.sender.getMessageCount());
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2
+ mc.clock.setMonotonicTime(25_000, TimeUnit.MILLISECONDS);
+
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertFalse(mc.controlHandler.isDone()); // Still pending messages, session _not_ yet done.
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 2 of 2
+ mc.controlHandler.resetMock();
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertTrue(mc.controlHandler.isDone()); // Now it's done.
+
+ assertEquals("onProgress : 0 active, 0 pending, 2 finished, 2 total\n" +
+ "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" +
+ "onDone : TIMEOUT - 'Session timeout of 5000 ms expired'\n",
+ mc.controlHandler.toString());
+ }
+
/**
* TODOs:
* - parameter validation (max pending, ...)
@@ -2437,7 +2535,6 @@ public class MessageBusVisitorSessionTestCase {
* - [add percent finished to progress file; ticket 5360824]
*/
- // TODO: what about session-wide timeouts?
// TODO: consider refactoring locking granularity
// TODO: figure out if we risk a re-run of the "too many tasks" issue
}