aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-05-19 16:08:16 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-05-19 16:08:16 +0200
commit1f9a26d7699ecf65ae85bbb0da424550174aa662 (patch)
treefef34d43c67a982280aa7f975766235f140afc60 /documentapi
parent0b8987105250d81308c172816b5c41b13a135b24 (diff)
Add deadline to DocumentOperationParameters and use in MessageBusAsyncSession
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java25
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/Response.java3
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/Result.java2
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java8
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java3
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java37
6 files changed, 67 insertions, 11 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
index 1d934680586..fa38312582e 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
@@ -5,6 +5,7 @@ import com.yahoo.document.fieldset.FieldSet;
import com.yahoo.document.fieldset.FieldSetRepo;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
@@ -18,20 +19,22 @@ import static java.util.Objects.requireNonNull;
*/
public class DocumentOperationParameters {
- private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null);
+ private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null, null);
private final DocumentProtocol.Priority priority;
private final String fieldSet;
private final String route;
private final int traceLevel;
+ private final Instant deadline;
private final ResponseHandler responseHandler;
private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route,
- int traceLevel, ResponseHandler responseHandler) {
+ int traceLevel, Instant deadline, ResponseHandler responseHandler) {
this.priority = priority;
this.fieldSet = fieldSet;
this.route = route;
this.traceLevel = traceLevel;
+ this.deadline = deadline;
this.responseHandler = responseHandler;
}
@@ -41,22 +44,22 @@ public class DocumentOperationParameters {
/** Sets the priority with which to perform an operation. */
public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) {
- return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler);
+ return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, deadline, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(FieldSet fieldSet) {
- return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler);
+ return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, deadline, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(String fieldSet) {
- return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler);
+ return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, deadline, responseHandler);
}
/** Sets the route along which to send the operation. */
public DocumentOperationParameters withRoute(String route) {
- return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler);
+ return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, deadline, responseHandler);
}
/** Sets the trace level for an operation. */
@@ -64,18 +67,24 @@ public class DocumentOperationParameters {
if (traceLevel < 0 || traceLevel > 9)
throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)");
- return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler);
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, deadline, responseHandler);
+ }
+
+ /** Sets the deadline for an operation. */
+ public DocumentOperationParameters withDeadline(Instant deadline) {
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(deadline), responseHandler);
}
/** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */
public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) {
- return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler));
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, deadline, requireNonNull(responseHandler));
}
public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); }
public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); }
public Optional<String> route() { return Optional.ofNullable(route); }
public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); }
+ public Optional<Instant> deadline() { return Optional.ofNullable(deadline); }
public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); }
@Override
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Response.java b/documentapi/src/main/java/com/yahoo/documentapi/Response.java
index cea9f247ade..4e4e038e3fc 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/Response.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/Response.java
@@ -127,6 +127,9 @@ public class Response {
/** The operation failed because the cluster had insufficient storage to accept it. */
INSUFFICIENT_STORAGE,
+ /** The operation timed out before it reached its destination. */
+ TIMEOUT,
+
/** The operation failed for some unknown reason. */
ERROR
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Result.java b/documentapi/src/main/java/com/yahoo/documentapi/Result.java
index 9b77090ea6d..38c49873d9b 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/Result.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/Result.java
@@ -82,7 +82,7 @@ public class Result {
/** The request failed, and retrying is pointless. */
FATAL_ERROR,
/** Condition specified in operation not met error */
- @Deprecated(since = "7", forRemoval = true) // TODO: Remove on Vespa 8 — this is a Response outcome, not a Result outcome.
+ @Deprecated(since = "7", forRemoval = true) // TODO: Remove on Vespa 8 — this is a Response outcome, not a Result outcome.
CONDITION_NOT_MET_ERROR
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
index 5def71e2d81..1da6f8bb472 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
@@ -26,7 +26,6 @@ import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentReply;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply;
-import java.util.logging.Level;
import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBus;
@@ -36,11 +35,14 @@ import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
@@ -49,6 +51,7 @@ import static com.yahoo.documentapi.Response.Outcome.ERROR;
import static com.yahoo.documentapi.Response.Outcome.INSUFFICIENT_STORAGE;
import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND;
import static com.yahoo.documentapi.Response.Outcome.SUCCESS;
+import static com.yahoo.documentapi.Response.Outcome.TIMEOUT;
/**
* An access session which wraps a messagebus source session sending document messages.
@@ -168,6 +171,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
long reqId = requestId.incrementAndGet();
msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null)));
msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel));
+ parameters.deadline().ifPresent(deadline -> msg.setTimeRemaining(Math.max(1, Duration.between(Instant.now(), deadline).toMillis())));
// Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew!
String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route);
if (toRoute != null) {
@@ -284,6 +288,8 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
if ( reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound()
|| reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound())
return NOT_FOUND;
+ if (reply.getErrorCodes().contains(ErrorCode.TIMEOUT))
+ return TIMEOUT;
return ERROR;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
index fa8164be700..8aada611d80 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
@@ -28,6 +28,7 @@ import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import java.time.Duration;
+import java.time.Instant;
import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
@@ -96,7 +97,7 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
private Reply syncSend(Message msg, Duration timeout, DocumentOperationParameters parameters) {
if (timeout != null) {
- msg.setTimeRemaining(timeout.toMillis());
+ parameters = parameters.withDeadline(Instant.now().plus(timeout));
}
try {
RequestMonitor monitor = new RequestMonitor();
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java
index 4ba51f3b3d8..78e4e49b78d 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java
@@ -1,9 +1,17 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.test;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
@@ -22,6 +30,13 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import java.time.Instant;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@@ -96,4 +111,26 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase {
// TODO(vekterli): test remote-to-local message sending as well?
// TODO(vekterli): test DocumentAccess shutdown during active ession?
}
+
+ @Test
+ public void requireThatTimeoutWorks() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Response> response = new AtomicReference<>();
+ AsyncSession session = access().createAsyncSession(new AsyncParameters());
+ DocumentType type = access().getDocumentTypeManager().getDocumentType("music");
+ Document doc1 = new Document(type, new DocumentId("id:ns:music::1"));
+ assertTrue(session.put(new DocumentPut(doc1),
+ DocumentOperationParameters.parameters()
+ .withResponseHandler(result -> {
+ response.set(result);
+ latch.countDown();
+ })
+ .withDeadline(Instant.now().minusSeconds(1)))
+ .isSuccess());
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+ assertNotNull(response.get());
+ assertEquals(Response.Outcome.TIMEOUT, response.get().outcome());
+ session.destroy();
+ }
+
}