summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-01-19 16:03:30 +0100
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-01-19 16:03:30 +0100
commit705bbb4ce9446b02ae097b9dc43e53b7637abe8a (patch)
treefdb7de74ed2c4ccef00473656fa32e33681d0a92 /documentapi
parentc9d2c693573606fa7cafd6a90bc5b0f1a07ffde5 (diff)
Add separate session-wide visitor timeout parameter
This augments today's per-visitor timeout, which should not be used implicitly for setting the session timeout.
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java21
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java26
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java42
3 files changed, 73 insertions, 16 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
index b81025c7286..f4a33034af8 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorParameters.java
@@ -23,6 +23,7 @@ public class VisitorParameters extends Parameters {
private String visitorLibrary = "DumpVisitor";
private int maxPending = 32;
private long timeoutMs = -1;
+ private long sessionTimeoutMs = -1;
private long fromTimestamp = 0;
private long toTimestamp = 0;
boolean visitRemoves = false;
@@ -113,9 +114,15 @@ public class VisitorParameters extends Parameters {
/** @return The maximum number of messages each storage visitor will have pending before waiting for acks from client. */
public int getMaxPending() { return maxPending; }
- /** @return The timeout for the visitor in milliseconds. */
+ /** @return The timeout for each sent visitor operation in milliseconds. */
public long getTimeoutMs() { return timeoutMs; }
+ /**
+ * @return Session timeout in milliseconds, or -1 if not timeout has been set. -1 implies
+ * that session will run to completion without automatically timing out.
+ */
+ public long getSessionTimeoutMs() { return sessionTimeoutMs; }
+
/** @return The minimum timestamp (in microsecs) of documents the visitor will visit. */
public long getFromTimestamp() { return fromTimestamp; }
@@ -191,9 +198,19 @@ public class VisitorParameters extends Parameters {
/** Set maximum pending messages one storage visitor will have pending to this client before stalling, waiting for acks. */
public void setMaxPending(int maxPending) { this.maxPending = maxPending; }
- /** Set the timeout for the visitor in milliseconds. */
+ /** Set the timeout for each visitor command in milliseconds. */
public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; }
+ /**
+ * Sets timeout for the entire visiting session, in milliseconds. -1 implies infinity.
+ *
+ * If the session takes more time than this to complete, it will automatically
+ * be failed with CompletionCode.TIMEOUT.
+ * If no session timeout has been explicitly set (or it has been set to -1), visiting will
+ * continue until it completes or abort()/destroy() is called on the session instance.
+ */
+ public void setSessionTimeoutMs(long timeoutMs) { this.sessionTimeoutMs = timeoutMs; }
+
/** Set from timestamp in microseconds. Documents put/updated before this timestamp will not be visited. */
public void setFromTimestamp(long timestamp) { fromTimestamp = timestamp; }
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
index db1bb1f5f0d..7e9faa9a8ea 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java
@@ -386,7 +386,7 @@ public class MessageBusVisitorSession implements VisitorSession {
return;
}
transitionTo(new StateDescription(State.WORKING));
- taskExecutor.submitTask(new SendCreateVisitorsTask(sessionTimeoutMillis()));
+ taskExecutor.submitTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(0)));
}
}
@@ -937,14 +937,29 @@ public class MessageBusVisitorSession implements VisitorSession {
|| enoughHitsReceived());
}
+ private long messageTimeoutMillis() {
+ return !isInfiniteTimeout(params.getTimeoutMs()) ? params.getTimeoutMs() : 5 * 60 * 1000;
+ }
+
private long sessionTimeoutMillis() {
- return (params.getTimeoutMs() != -1) ? params.getTimeoutMs() : 5 * 60 * 1000;
+ return params.getSessionTimeoutMs();
}
private long elapsedTimeMillis() {
return TimeUnit.NANOSECONDS.toMillis(clock.monotonicNanoTime() - startTimeNanos);
}
+ private static boolean isInfiniteTimeout(long timeoutMs) {
+ return timeoutMs < 0;
+ }
+
+ private long computeBoundedMessageTimeoutMillis(long elapsedMs) {
+ final long messageTimeoutMs = messageTimeoutMillis();
+ return !isInfiniteTimeout(sessionTimeoutMillis())
+ ? Math.min(sessionTimeoutMillis() - elapsedMs, messageTimeoutMs)
+ : messageTimeoutMs;
+ }
+
/**
* Schedule a new SendCreateVisitors task iff there are still buckets to
* visit, the visiting has not failed fatally and we haven't already
@@ -952,9 +967,8 @@ public class MessageBusVisitorSession implements VisitorSession {
*/
private void scheduleSendCreateVisitorsIfApplicable(long delay, TimeUnit unit) {
final long elapsedMillis = elapsedTimeMillis();
- final long timeoutMillis = sessionTimeoutMillis();
- if (elapsedMillis >= timeoutMillis) {
- transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", timeoutMillis)));
+ if (!isInfiniteTimeout(sessionTimeoutMillis()) && (elapsedMillis >= sessionTimeoutMillis())) {
+ transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", sessionTimeoutMillis())));
if (visitingCompleted()) {
markSessionCompleted();
}
@@ -963,7 +977,7 @@ public class MessageBusVisitorSession implements VisitorSession {
if (!mayScheduleCreateVisitorsTask()) {
return;
}
- final long messageTimeoutMillis = timeoutMillis - elapsedMillis;
+ final long messageTimeoutMillis = computeBoundedMessageTimeoutMillis(elapsedMillis);
taskExecutor.scheduleTask(new SendCreateVisitorsTask(messageTimeoutMillis), delay, unit);
scheduledSendCreateVisitors = true;
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
index c6352639af5..e24c8c58436 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusVisitorSessionTestCase.java
@@ -2448,9 +2448,11 @@ public class MessageBusVisitorSessionTestCase {
mc.receiver.repliesToString());
}
- private MockComponents createTimeoutMocksAtInitialTime(long timeoutMillis, long currentTimeMillis, int maxPending) {
+ private MockComponents createTimeoutMocksAtInitialTime(long messageTimeoutMillis, long sessionTimeoutMillis,
+ long currentTimeMillis, int maxPending) {
MockComponentsBuilder builder = new MockComponentsBuilder();
- builder.params.setTimeoutMs(timeoutMillis);
+ builder.params.setTimeoutMs(messageTimeoutMillis);
+ builder.params.setSessionTimeoutMs(sessionTimeoutMillis);
builder.params.setControlHandler(builder.controlHandler);
MockComponents mc = builder.createMockComponents();
mc.sender.setMaxPending(maxPending);
@@ -2463,18 +2465,18 @@ public class MessageBusVisitorSessionTestCase {
}
@Test
- public void visitor_command_timeout_set_to_remaining_session_timeout() {
- MockComponents mc = createTimeoutMocksAtInitialTime(10_000, 10_000, 1);
+ public void visitor_command_timeout_set_to_min_of_message_timeout_and_remaining_session_timeout() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(6_000, 10_000, 10_000, 1);
// Superbucket 1 of 2
assertEquals("CreateVisitorMessage(buckets=[\n" +
"BucketId(0x0400000000000000)\n" +
"BucketId(0x0000000000000000)\n" +
"]\n" +
- "time remaining=10000\n)",
+ "time remaining=6000\n)",
replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
- mc.clock.setMonotonicTime(14, TimeUnit.SECONDS); // 4 seconds elapsed from baseline
+ mc.clock.setMonotonicTime(15, TimeUnit.SECONDS); // 5 seconds elapsed from baseline
mc.executor.expectAndProcessTasks(1); // reply
mc.executor.expectAndProcessTasks(1); // send create visitors
// Superbucket 2 of 2
@@ -2482,13 +2484,37 @@ public class MessageBusVisitorSessionTestCase {
"BucketId(0x0400000000000001)\n" +
"BucketId(0x0000000000000000)\n" +
"]\n" +
+ "time remaining=5000\n)", // No timeout greater than 5s can be used, or session will have timed out
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+ }
+
+ @Test
+ public void infinite_session_timeout_does_not_affect_message_timeout() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(6_000, -1, 10_000, 1);
+
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000000)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
"time remaining=6000\n)",
replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
}
@Test
+ public void message_timeout_greater_than_session_timeout_is_bounded() {
+ MockComponents mc = createTimeoutMocksAtInitialTime(6_000, 3_000, 10_000, 1);
+
+ assertEquals("CreateVisitorMessage(buckets=[\n" +
+ "BucketId(0x0400000000000000)\n" +
+ "BucketId(0x0000000000000000)\n" +
+ "]\n" +
+ "time remaining=3000\n)",
+ replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET));
+ }
+
+ @Test
public void fail_session_with_timeout_if_timeout_has_elapsed() {
- MockComponents mc = createTimeoutMocksAtInitialTime(4_000, 20_000, 1);
+ MockComponents mc = createTimeoutMocksAtInitialTime(1_000, 4_000, 20_000, 1);
replyToCreateVisitor(mc.sender, ProgressToken.FINISHED_BUCKET); // Super bucket 1 of 2
mc.clock.setMonotonicTime(24_000, TimeUnit.MILLISECONDS); // 4 second timeout expired
@@ -2505,7 +2531,7 @@ public class MessageBusVisitorSessionTestCase {
@Test
public void timeout_with_pending_messages_does_not_close_session_until_all_replies_received() {
- MockComponents mc = createTimeoutMocksAtInitialTime(5_000, 20_000, 2);
+ MockComponents mc = createTimeoutMocksAtInitialTime(1_000, 5_000, 20_000, 2);
assertEquals(2, mc.sender.getMessageCount());