diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-01-19 16:03:30 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-01-19 16:03:30 +0100 |
commit | 705bbb4ce9446b02ae097b9dc43e53b7637abe8a (patch) | |
tree | fdb7de74ed2c4ccef00473656fa32e33681d0a92 /documentapi | |
parent | c9d2c693573606fa7cafd6a90bc5b0f1a07ffde5 (diff) |
Add separate session-wide visitor timeout parameter
This augments today's per-visitor timeout, which should not be used
implicitly for setting the session timeout.
Diffstat (limited to 'documentapi')
3 files changed, 73 insertions, 16 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java index b81025c7286..f4a33034af8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java @@ -23,6 +23,7 @@ public class VisitorParameters extends Parameters { private String visitorLibrary = "DumpVisitor"; private int maxPending = 32; private long timeoutMs = -1; + private long sessionTimeoutMs = -1; private long fromTimestamp = 0; private long toTimestamp = 0; boolean visitRemoves = false; @@ -113,9 +114,15 @@ public class VisitorParameters extends Parameters { /** @return The maximum number of messages each storage visitor will have pending before waiting for acks from client. */ public int getMaxPending() { return maxPending; } - /** @return The timeout for the visitor in milliseconds. */ + /** @return The timeout for each sent visitor operation in milliseconds. */ public long getTimeoutMs() { return timeoutMs; } + /** + * @return Session timeout in milliseconds, or -1 if not timeout has been set. -1 implies + * that session will run to completion without automatically timing out. + */ + public long getSessionTimeoutMs() { return sessionTimeoutMs; } + /** @return The minimum timestamp (in microsecs) of documents the visitor will visit. */ public long getFromTimestamp() { return fromTimestamp; } @@ -191,9 +198,19 @@ public class VisitorParameters extends Parameters { /** Set maximum pending messages one storage visitor will have pending to this client before stalling, waiting for acks. */ public void setMaxPending(int maxPending) { this.maxPending = maxPending; } - /** Set the timeout for the visitor in milliseconds. */ + /** Set the timeout for each visitor command in milliseconds. */ public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; } + /** + * Sets timeout for the entire visiting session, in milliseconds. -1 implies infinity. + * + * If the session takes more time than this to complete, it will automatically + * be failed with CompletionCode.TIMEOUT. + * If no session timeout has been explicitly set (or it has been set to -1), visiting will + * continue until it completes or abort()/destroy() is called on the session instance. + */ + public void setSessionTimeoutMs(long timeoutMs) { this.sessionTimeoutMs = timeoutMs; } + /** Set from timestamp in microseconds. Documents put/updated before this timestamp will not be visited. */ public void setFromTimestamp(long timestamp) { fromTimestamp = timestamp; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index db1bb1f5f0d..7e9faa9a8ea 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -386,7 +386,7 @@ public class MessageBusVisitorSession implements VisitorSession { return; } transitionTo(new StateDescription(State.WORKING)); - taskExecutor.submitTask(new SendCreateVisitorsTask(sessionTimeoutMillis())); + taskExecutor.submitTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(0))); } } @@ -937,14 +937,29 @@ public class MessageBusVisitorSession implements VisitorSession { || enoughHitsReceived()); } + private long messageTimeoutMillis() { + return !isInfiniteTimeout(params.getTimeoutMs()) ? params.getTimeoutMs() : 5 * 60 * 1000; + } + private long sessionTimeoutMillis() { - return (params.getTimeoutMs() != -1) ? params.getTimeoutMs() : 5 * 60 * 1000; + return params.getSessionTimeoutMs(); } private long elapsedTimeMillis() { return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos); } + private static boolean isInfiniteTimeout(long timeoutMs) { + return timeoutMs < 0; + } + + private long computeBoundedMessageTimeoutMillis(long elapsedMs) { + final long messageTimeoutMs = messageTimeoutMillis(); + return !isInfiniteTimeout(sessionTimeoutMillis()) + ? Math.min(sessionTimeoutMillis() - elapsedMs, messageTimeoutMs) + : messageTimeoutMs; + } + /** * Schedule a new SendCreateVisitors task iff there are still buckets to * visit, the visiting has not failed fatally and we haven't already @@ -952,9 +967,8 @@ public class MessageBusVisitorSession implements VisitorSession { */ private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) { final long elapsedMillis = elapsedTimeMillis(); - final long timeoutMillis = sessionTimeoutMillis(); - if (elapsedMillis >= timeoutMillis) { - transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", timeoutMillis))); + if (!isInfiniteTimeout(sessionTimeoutMillis()) && (elapsedMillis >= sessionTimeoutMillis())) { + transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", sessionTimeoutMillis()))); if (visitingCompleted()) { markSessionCompleted(); } @@ -963,7 +977,7 @@ public class MessageBusVisitorSession implements VisitorSession { if (!mayScheduleCreateVisitorsTask()) { return; } - final long messageTimeoutMillis = timeoutMillis - elapsedMillis; + final long messageTimeoutMillis = computeBoundedMessageTimeoutMillis(elapsedMillis); taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMillis), delay, unit); scheduledSendCreateVisitors = true; } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java index c6352639af5..e24c8c58436 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java @@ -2448,9 +2448,11 @@ public class MessageBusVisitorSessionTestCase { mc.receiver.repliesToString()); } - private MockComponents createTimeoutMocksAtInitialTime(long timeoutMillis, long currentTimeMillis, int maxPending) { + private MockComponents createTimeoutMocksAtInitialTime(long messageTimeoutMillis, long sessionTimeoutMillis, + long currentTimeMillis, int maxPending) { MockComponentsBuilder builder = new MockComponentsBuilder(); - builder.params.setTimeoutMs(timeoutMillis); + builder.params.setTimeoutMs(messageTimeoutMillis); + builder.params.setSessionTimeoutMs(sessionTimeoutMillis); builder.params.setControlHandler(builder.controlHandler); MockComponents mc = builder.createMockComponents(); mc.sender.setMaxPending(maxPending); @@ -2463,18 +2465,18 @@ public class MessageBusVisitorSessionTestCase { } @Test - public void visitor_command_timeout_set_to_remaining_session_timeout() { - MockComponents mc = createTimeoutMocksAtInitialTime(10_000, 10_000, 1); + public void visitor_command_timeout_set_to_min_of_message_timeout_and_remaining_session_timeout() { + MockComponents mc = createTimeoutMocksAtInitialTime(6_000, 10_000, 10_000, 1); // Superbucket 1 of 2 assertEquals("CreateVisitorMessage(buckets=[\n" + "BucketId(0x0400000000000000)\n" + "BucketId(0x0000000000000000)\n" + "]\n" + - "time remaining=10000\n)", + "time remaining=6000\n)", replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); - mc.clock.setMonotonicTime(14, TimeUnit.SECONDS); // 4 seconds elapsed from baseline + mc.clock.setMonotonicTime(15, TimeUnit.SECONDS); // 5 seconds elapsed from baseline mc.executor.expectAndProcessTasks(1); // reply mc.executor.expectAndProcessTasks(1); // send create visitors // Superbucket 2 of 2 @@ -2482,13 +2484,37 @@ public class MessageBusVisitorSessionTestCase { "BucketId(0x0400000000000001)\n" + "BucketId(0x0000000000000000)\n" + "]\n" + + "time remaining=5000\n)", // No timeout greater than 5s can be used, or session will have timed out + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); + } + + @Test + public void infinite_session_timeout_does_not_affect_message_timeout() { + MockComponents mc = createTimeoutMocksAtInitialTime(6_000, -1, 10_000, 1); + + assertEquals("CreateVisitorMessage(buckets=[\n" + + "BucketId(0x0400000000000000)\n" + + "BucketId(0x0000000000000000)\n" + + "]\n" + "time remaining=6000\n)", replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); } @Test + public void message_timeout_greater_than_session_timeout_is_bounded() { + MockComponents mc = createTimeoutMocksAtInitialTime(6_000, 3_000, 10_000, 1); + + assertEquals("CreateVisitorMessage(buckets=[\n" + + "BucketId(0x0400000000000000)\n" + + "BucketId(0x0000000000000000)\n" + + "]\n" + + "time remaining=3000\n)", + replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET)); + } + + @Test public void fail_session_with_timeout_if_timeout_has_elapsed() { - MockComponents mc = createTimeoutMocksAtInitialTime(4_000, 20_000, 1); + MockComponents mc = createTimeoutMocksAtInitialTime(1_000, 4_000, 20_000, 1); replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2 mc.clock.setMonotonicTime(24_000, TimeUnit.MILLISECONDS); // 4 second timeout expired @@ -2505,7 +2531,7 @@ public class MessageBusVisitorSessionTestCase { @Test public void timeout_with_pending_messages_does_not_close_session_until_all_replies_received() { - MockComponents mc = createTimeoutMocksAtInitialTime(5_000, 20_000, 2); + MockComponents mc = createTimeoutMocksAtInitialTime(1_000, 5_000, 20_000, 2); assertEquals(2, mc.sender.getMessageCount()); |