aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2016-12-07 16:13:20 +0100
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2016-12-07 16:13:20 +0100
commit4bf536c16dacd9dc85146ce4daa3c76f0e889ccf (patch)
treef1c3776fc0d30c1dcc2094760c933d932d3f7bf6 /documentapi
parentcbdcb16c3d6fcc52dc0e758118a4f56503890c77 (diff)
Improve visitor session timeout handling
Send visitors with request timeouts relative to the session timeout and the elapsed session time. Fail the session if timeout has elapsed when trying to send additional visitor requests.
Diffstat (limited to 'documentapi')
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java149
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java121
2 files changed, 209 insertions, 61 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index 42d6d2e66a3..273d5d89ac2 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -73,6 +73,10 @@ public class MessageBusVisitorSession implements VisitorSession {
void scheduleTask(Runnable event, long delay, TimeUnit unit);
}
+ public interface Clock {
+ long monotonicNanoTime();
+ }
+
public static class VisitingProgress {
private final VisitorIterator iterator;
private final ProgressToken token;
@@ -268,6 +272,13 @@ public class MessageBusVisitorSession implements VisitorSession {
}
}
+ public static class RealClock implements Clock {
+ @Override
+ public long monotonicNanoTime() {
+ return System.nanoTime();
+ }
+ }
+
private static final Logger log = Logger.getLogger(MessageBusVisitorSession.class.getName());
private static long sessionCounter = 0;
@@ -288,8 +299,10 @@ public class MessageBusVisitorSession implements VisitorSession {
private final VisitorStatistics statistics;
private final String sessionName = createSessionName();
private final String dataDestination;
+ private final Clock clock;
private StateDescription state;
private long visitorCounter = 0;
+ private long startTimeNanos = 0;
private boolean scheduledSendCreateVisitors = false;
private boolean done = false;
private boolean destroying = false; // For testing and sanity checking
@@ -309,6 +322,19 @@ public class MessageBusVisitorSession implements VisitorSession {
RoutingTable routingTable)
throws ParseException
{
+ this(visitorParameters, taskExecutor, senderFactory,
+ receiverFactory, routingTable, new RealClock());
+ }
+
+ // TODO builder pattern
+ public MessageBusVisitorSession(VisitorParameters visitorParameters,
+ AsyncTaskExecutor taskExecutor,
+ SenderFactory senderFactory,
+ ReceiverFactory receiverFactory,
+ RoutingTable routingTable,
+ Clock clock)
+ throws ParseException
+ {
this.params = visitorParameters; // TODO(vekterli): make copy? legacy impl does not copy
initializeRoute(routingTable);
this.sender = senderFactory.createSender(createReplyHandler(), this.params);
@@ -317,6 +343,7 @@ public class MessageBusVisitorSession implements VisitorSession {
this.progress = createVisitingProgress(params);
this.statistics = new VisitorStatistics();
this.state = new StateDescription(State.NOT_STARTED);
+ this.clock = clock;
initializeHandlers();
trace = new Trace(visitorParameters.getTraceLevel());
dataDestination = (params.getLocalDataHandler() == null
@@ -339,13 +366,14 @@ public class MessageBusVisitorSession implements VisitorSession {
public void start() {
synchronized (progress.getToken()) {
+ this.startTimeNanos = clock.monotonicNanoTime();
if (progress.getIterator().isDone()) {
log.log(LogLevel.DEBUG, sessionName + ": progress token indicates " +
"session is done before it could even start; no-op");
return;
}
transitionTo(new StateDescription(State.WORKING));
- taskExecutor.submitTask(new SendCreateVisitorsTask());
+ taskExecutor.submitTask(new SendCreateVisitorsTask(sessionTimeoutMs()));
}
}
@@ -367,15 +395,19 @@ public class MessageBusVisitorSession implements VisitorSession {
assert(state.getState() == State.NOT_STARTED);
state = newState;
} else if (newState.getState() == State.COMPLETED) {
- if (state.getState() != State.ABORTED && state.getState() != State.FAILED) {
+ if (!state.failed()) {
state = newState;
- } // else: don't override aborted state
+ } // else: don't override existing failure state
} else if (newState.getState() == State.ABORTED) {
state = newState;
} else if (newState.getState() == State.FAILED) {
if (state.getState() != State.FAILED) {
state = newState;
- } // else: don't override failed state
+ } // else: don't override existing failure state
+ } else if (newState.getState() == State.TIMED_OUT) {
+ if (!state.failed()) {
+ state = newState;
+ } // else: don't override existing failure state
} else {
assert(false);
}
@@ -384,29 +416,26 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private ReplyHandler createReplyHandler() {
- return new ReplyHandler() {
- @Override
- public void handleReply(Reply reply) {
- // Generally, handleReply will run in the context of the
- // underlying transport layer's processing thread(s), so we
- // schedule our own reply handling task to avoid blocking it.
- try {
- taskExecutor.submitTask(new HandleReplyTask(reply));
- } catch (RejectedExecutionException e) {
- // We cannot reliably handle reply tasks failing to be submitted, since
- // the reply task performs all our internal state handling logic. As such,
- // we just immediately go into a failure destruction mode as soon as this
- // happens, in which we do not wait for any active messages to be replied
- // to.
- log.log(LogLevel.WARNING, "Visitor session '" + sessionName +
- "': failed to submit reply task to executor service! " +
- "Session cannot reliably continue; terminating it early.", e);
-
- synchronized (progress.getToken()) {
- transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
- if (!done) {
- markSessionCompleted();
- }
+ return (reply) -> {
+ // Generally, handleReply will run in the context of the
+ // underlying transport layer's processing thread(s), so we
+ // schedule our own reply handling task to avoid blocking it.
+ try {
+ taskExecutor.submitTask(new HandleReplyTask(reply));
+ } catch (RejectedExecutionException e) {
+ // We cannot reliably handle reply tasks failing to be submitted, since
+ // the reply task performs all our internal state handling logic. As such,
+ // we just immediately go into a failure destruction mode as soon as this
+ // happens, in which we do not wait for any active messages to be replied
+ // to.
+ log.log(LogLevel.WARNING, "Visitor session '" + sessionName +
+ "': failed to submit reply task to executor service! " +
+ "Session cannot reliably continue; terminating it early.", e);
+
+ synchronized (progress.getToken()) {
+ transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
+ if (!done) {
+ markSessionCompleted();
}
}
}
@@ -414,19 +443,16 @@ public class MessageBusVisitorSession implements VisitorSession {
}
private MessageHandler createMessageHandler() {
- return new MessageHandler() {
- @Override
- public void handleMessage(Message message) {
- try {
- taskExecutor.submitTask(new HandleMessageTask(message));
- } catch (RejectedExecutionException e) {
- Reply reply = ((DocumentMessage)message).createReply();
- message.swapState(reply);
- reply.addError(new Error(
- DocumentProtocol.ERROR_ABORTED,
- "Visitor session has been aborted"));
- receiver.reply(reply);
- }
+ return (message) -> {
+ try {
+ taskExecutor.submitTask(new HandleMessageTask(message));
+ } catch (RejectedExecutionException e) {
+ Reply reply = ((DocumentMessage)message).createReply();
+ message.swapState(reply);
+ reply.addError(new Error(
+ DocumentProtocol.ERROR_ABORTED,
+ "Visitor session has been aborted"));
+ receiver.reply(reply);
}
};
}
@@ -530,6 +556,12 @@ public class MessageBusVisitorSession implements VisitorSession {
// All private methods in this task must be protected by a lock around
// the progress token!
+ private final long messageTimeoutMs;
+
+ public SendCreateVisitorsTask(long messageTimeoutMs) {
+ this.messageTimeoutMs = messageTimeoutMs;
+ }
+
private String getNextVisitorId() {
StringBuilder sb = new StringBuilder();
++visitorCounter;
@@ -545,9 +577,7 @@ public class MessageBusVisitorSession implements VisitorSession {
dataDestination);
msg.getTrace().setLevel(params.getTraceLevel());
- msg.setTimeRemaining(params.getTimeoutMs() != -1
- ? params.getTimeoutMs()
- : 5 * 60 * 1000);
+ msg.setTimeRemaining(messageTimeoutMs);
msg.setBuckets(Arrays.asList(bucket.getSuperbucket(), bucket.getProgress()));
msg.setDocumentSelection(params.getDocumentSelection());
msg.setFromTimestamp(params.getFromTimestamp());
@@ -892,23 +922,44 @@ public class MessageBusVisitorSession implements VisitorSession {
|| enoughHitsReceived());
}
+ private long sessionTimeoutMs() {
+ return (params.getTimeoutMs() != -1) ? params.getTimeoutMs() : 5 * 60 * 1000;
+ }
+
+ private long elapsedTimeMs() {
+ return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos);
+ }
+
/**
* Schedule a new SendCreateVisitors task iff there are still buckets to
* visit, the visiting has not failed fatally and we haven't already
* scheduled such a task.
*/
private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) {
- if (scheduledSendCreateVisitors
- || !progress.getIterator().hasNext()
- || state.failed()
- || enoughHitsReceived())
- {
+ final long elapsedMs = elapsedTimeMs();
+ final long timeoutMs = sessionTimeoutMs();
+ if (elapsedMs >= timeoutMs) {
+ transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", timeoutMs)));
+ if (visitingCompleted()) {
+ markSessionCompleted();
+ }
return;
}
- taskExecutor.scheduleTask(new SendCreateVisitorsTask(), delay, unit);
+ if (!mayScheduleCreateVisitorsTask()) {
+ return;
+ }
+ final long messageTimeoutMs = timeoutMs - elapsedMs;
+ taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMs), delay, unit);
scheduledSendCreateVisitors = true;
}
+ private boolean mayScheduleCreateVisitorsTask() {
+ return ! (scheduledSendCreateVisitors
+ || !progress.getIterator().hasNext()
+ || state.failed()
+ || enoughHitsReceived());
+ }
+
private void scheduleSendCreateVisitorsIfApplicable() {
scheduleSendCreateVisitorsIfApplicable(0, TimeUnit.MILLISECONDS);
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
index 6df4c75695c..a9c8ee88082 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
+// TODO replace explicit pre-mockito mock classes with proper mockito mocks wherever possible
public class MessageBusVisitorSessionTestCase {
private class MockSender implements MessageBusVisitorSession.Sender {
private int maxPending = 1000;
@@ -393,11 +394,23 @@ public class MessageBusVisitorSessionTestCase {
}
}
+ private class MockClock implements MessageBusVisitorSession.Clock {
+ private long monotonicTime = 0;
+
+ @Override
+ public long monotonicNanoTime() { return monotonicTime; }
+
+ public void setMonotonicTime(long monotonicTime, TimeUnit unit) {
+ this.monotonicTime = unit.toNanos(monotonicTime);
+ }
+ }
+
private MessageBusVisitorSession createVisitorSession(MockSender sender,
- MockReceiver receiver,
- MockAsyncTaskExecutor executor,
- VisitorParameters visitorParameters,
- RoutingTable routingTable)
+ MockReceiver receiver,
+ MockAsyncTaskExecutor executor,
+ VisitorParameters visitorParameters,
+ RoutingTable routingTable,
+ MockClock clock)
{
if (routingTable == null) {
routingTable = new RoutingTable(new RoutingTableSpec(DocumentProtocol.NAME));
@@ -408,18 +421,19 @@ public class MessageBusVisitorSessionTestCase {
executor,
new MockSenderFactory(sender),
new MockReceiverFactory(receiver),
- routingTable);
+ routingTable,
+ clock);
} catch (ParseException e) {
throw new IllegalArgumentException("Bad document selection", e);
}
}
private MessageBusVisitorSession createVisitorSession(MockSender sender,
- MockReceiver receiver,
- MockAsyncTaskExecutor executor,
- VisitorParameters visitorParameters)
+ MockReceiver receiver,
+ MockAsyncTaskExecutor executor,
+ VisitorParameters visitorParameters)
{
- return createVisitorSession(sender, receiver, executor, visitorParameters, null);
+ return createVisitorSession(sender, receiver, executor, visitorParameters, null, new MockClock());
}
VisitorParameters createVisitorParameters(String selection) {
@@ -551,6 +565,7 @@ public class MessageBusVisitorSessionTestCase {
public MockControlHandler controlHandler;
public MockDataHandler dataHandler;
public MessageBusVisitorSession visitorSession;
+ public MockClock clock;
public MockComponents(VisitorParameters visitorParameters) {
this(visitorParameters, null);
@@ -563,9 +578,10 @@ public class MessageBusVisitorSessionTestCase {
params = visitorParameters;
controlHandler = new MockControlHandler();
dataHandler = new MockDataHandler();
+ clock = new MockClock();
params.setControlHandler(controlHandler);
params.setLocalDataHandler(dataHandler);
- visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable);
+ visitorSession = createVisitorSession(sender, receiver, executor, params, routingTable, clock);
}
public MockComponents() {
@@ -584,7 +600,8 @@ public class MessageBusVisitorSessionTestCase {
params = builder.params;
controlHandler = builder.controlHandler;
dataHandler = builder.dataHandler;
- visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable);
+ clock = builder.clock;
+ visitorSession = createVisitorSession(sender, receiver, executor, params, builder.routingTable, clock);
}
}
@@ -596,6 +613,7 @@ public class MessageBusVisitorSessionTestCase {
public MockControlHandler controlHandler = new MockControlHandler();
public MockDataHandler dataHandler = new MockDataHandler();
public RoutingTable routingTable = null;
+ public MockClock clock = new MockClock();
public MockComponents createMockComponents() {
return new MockComponents(this);
@@ -2430,6 +2448,86 @@ public class MessageBusVisitorSessionTestCase {
mc.receiver.repliesToString());
}
+ private MockComponents createTimeoutMocksAtInitialTime(long timeoutMs, long currentTimeMs, int maxPending) {
+ MockComponentsBuilder builder = new MockComponentsBuilder();
+ builder.params.setTimeoutMs(timeoutMs);
+ builder.params.setControlHandler(builder.controlHandler);
+ MockComponents mc = builder.createMockComponents();
+ mc.sender.setMaxPending(maxPending);
+ mc.clock.setMonotonicTime(currentTimeMs, TimeUnit.MILLISECONDS); // Baseline time
+
+ mc.visitorSession.start();
+ mc.controlHandler.resetMock(); // clear messages
+ mc.executor.expectAndProcessTasks(1);
+ return mc;
+ }
+
+ @Test
+ public void visitor_command_timeout_set_to_remaining_session_timeout() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(10_000, 10_000, 1);
+
+ // Superbucket 1 of 2
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000000)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
+ "time remaining=10000\n)",
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+
+ mc.clock.setMonotonicTime(14, TimeUnit.SECONDS); // 4 seconds elapsed from baseline
+ mc.executor.expectAndProcessTasks(1); // reply
+ mc.executor.expectAndProcessTasks(1); // send create visitors
+ // Superbucket 2 of 2
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000001)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
+ "time remaining=6000\n)",
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+ }
+
+ @Test
+ public void fail_session_with_timeout_if_timeout_has_elapsed() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(4_000, 20_000, 1);
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2
+ mc.clock.setMonotonicTime(24_000, TimeUnit.MILLISECONDS); // 4 second timeout expired
+
+ // Reply task processing shall discover that timeout has expired
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertTrue(mc.controlHandler.isDone());
+ assertEquals("onProgress : 0 active, 1 pending, 1 finished, 2 total\n" +
+ "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" +
+ "onDone : TIMEOUT - 'Session timeout of 4000 ms expired'\n",
+ mc.controlHandler.toString());
+ }
+
+ @Test
+ public void timeout_with_pending_messages_does_not_close_session_until_all_replies_received() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(5_000, 20_000, 2);
+
+ assertEquals(2, mc.sender.getMessageCount());
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2
+ mc.clock.setMonotonicTime(25_000, TimeUnit.MILLISECONDS);
+
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertFalse(mc.controlHandler.isDone()); // Still pending messages, session _not_ yet done.
+
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 2 of 2
+ mc.controlHandler.resetMock();
+ mc.executor.expectAndProcessTasks(1);
+ mc.executor.expectNoTasks(); // No further send tasks enqueued
+ assertTrue(mc.controlHandler.isDone()); // Now it's done.
+
+ assertEquals("onProgress : 0 active, 0 pending, 2 finished, 2 total\n" +
+ "onVisitorStatistics : 0 buckets visited, 0 docs returned\n" +
+ "onDone : TIMEOUT - 'Session timeout of 5000 ms expired'\n",
+ mc.controlHandler.toString());
+ }
+
/**
* TODOs:
* - parameter validation (max pending, ...)
@@ -2437,7 +2535,6 @@ public class MessageBusVisitorSessionTestCase {
* - [add percent finished to progress file; ticket 5360824]
*/
- // TODO: what about session-wide timeouts?
// TODO: consider refactoring locking granularity
// TODO: figure out if we risk a re-run of the "too many tasks" issue
}