diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-05-19 16:08:16 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-05-19 16:08:16 +0200 |
commit | 1f9a26d7699ecf65ae85bbb0da424550174aa662 (patch) | |
tree | fef34d43c67a982280aa7f975766235f140afc60 /documentapi/src | |
parent | 0b8987105250d81308c172816b5c41b13a135b24 (diff) |
Add deadline to DocumentOperationParameters and use in MessageBusAsyncSession
Diffstat (limited to 'documentapi/src')
6 files changed, 67 insertions, 11 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java index 1d934680586..fa38312582e 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java @@ -5,6 +5,7 @@ import com.yahoo.document.fieldset.FieldSet; import com.yahoo.document.fieldset.FieldSetRepo; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import java.time.Instant; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -18,20 +19,22 @@ import static java.util.Objects.requireNonNull; */ public class DocumentOperationParameters { - private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null); + private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null, null); private final DocumentProtocol.Priority priority; private final String fieldSet; private final String route; private final int traceLevel; + private final Instant deadline; private final ResponseHandler responseHandler; private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, - int traceLevel, ResponseHandler responseHandler) { + int traceLevel, Instant deadline, ResponseHandler responseHandler) { this.priority = priority; this.fieldSet = fieldSet; this.route = route; this.traceLevel = traceLevel; + this.deadline = deadline; this.responseHandler = responseHandler; } @@ -41,22 +44,22 @@ public class DocumentOperationParameters { /** Sets the priority with which to perform an operation. */ public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) { - return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler); + return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, deadline, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(FieldSet fieldSet) { - return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler); + return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, deadline, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(String fieldSet) { - return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler); + return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, deadline, responseHandler); } /** Sets the route along which to send the operation. */ public DocumentOperationParameters withRoute(String route) { - return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler); + return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, deadline, responseHandler); } /** Sets the trace level for an operation. */ @@ -64,18 +67,24 @@ public class DocumentOperationParameters { if (traceLevel < 0 || traceLevel > 9) throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)"); - return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler); + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, deadline, responseHandler); + } + + /** Sets the deadline for an operation. */ + public DocumentOperationParameters withDeadline(Instant deadline) { + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(deadline), responseHandler); } /** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */ public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) { - return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler)); + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, deadline, requireNonNull(responseHandler)); } public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); } public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); } public Optional<String> route() { return Optional.ofNullable(route); } public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); } + public Optional<Instant> deadline() { return Optional.ofNullable(deadline); } public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); } @Override diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Response.java b/documentapi/src/main/java/com/yahoo/documentapi/Response.java index cea9f247ade..4e4e038e3fc 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/Response.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/Response.java @@ -127,6 +127,9 @@ public class Response { /** The operation failed because the cluster had insufficient storage to accept it. */ INSUFFICIENT_STORAGE, + /** The operation timed out before it reached its destination. */ + TIMEOUT, + /** The operation failed for some unknown reason. */ ERROR diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Result.java b/documentapi/src/main/java/com/yahoo/documentapi/Result.java index 9b77090ea6d..38c49873d9b 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/Result.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/Result.java @@ -82,7 +82,7 @@ public class Result { /** The request failed, and retrying is pointless. */ FATAL_ERROR, /** Condition specified in operation not met error */ - @Deprecated(since = "7", forRemoval = true) // TODO: Remove on Vespa 8 — this is a Response outcome, not a Result outcome. + @Deprecated(since = "7", forRemoval = true) // TODO: Remove on Vespa 8 — this is a Response outcome, not a Result outcome. CONDITION_NOT_MET_ERROR } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index 5def71e2d81..1da6f8bb472 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -26,7 +26,6 @@ import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentReply; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentReply; -import java.util.logging.Level; import com.yahoo.messagebus.ErrorCode; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBus; @@ -36,11 +35,14 @@ import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; +import java.time.Duration; +import java.time.Instant; import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.documentapi.DocumentOperationParameters.parameters; @@ -49,6 +51,7 @@ import static com.yahoo.documentapi.Response.Outcome.ERROR; import static com.yahoo.documentapi.Response.Outcome.INSUFFICIENT_STORAGE; import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND; import static com.yahoo.documentapi.Response.Outcome.SUCCESS; +import static com.yahoo.documentapi.Response.Outcome.TIMEOUT; /** * An access session which wraps a messagebus source session sending document messages. @@ -168,6 +171,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { long reqId = requestId.incrementAndGet(); msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null))); msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel)); + parameters.deadline().ifPresent(deadline -> msg.setTimeRemaining(Math.max(1, Duration.between(Instant.now(), deadline).toMillis()))); // Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew! String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); if (toRoute != null) { @@ -284,6 +288,8 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { if ( reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound() || reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound()) return NOT_FOUND; + if (reply.getErrorCodes().contains(ErrorCode.TIMEOUT)) + return TIMEOUT; return ERROR; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index fa8164be700..8aada611d80 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -28,6 +28,7 @@ import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.ReplyHandler; import java.time.Duration; +import java.time.Instant; import static com.yahoo.documentapi.DocumentOperationParameters.parameters; @@ -96,7 +97,7 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re private Reply syncSend(Message msg, Duration timeout, DocumentOperationParameters parameters) { if (timeout != null) { - msg.setTimeRemaining(timeout.toMillis()); + parameters = parameters.withDeadline(Instant.now().plus(timeout)); } try { RequestMonitor monitor = new RequestMonitor(); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java index 4ba51f3b3d8..78e4e49b78d 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/test/MessageBusDocumentApiTestCase.java @@ -1,9 +1,17 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.test; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.Response; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; @@ -22,6 +30,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** @@ -96,4 +111,26 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase { // TODO(vekterli): test remote-to-local message sending as well? // TODO(vekterli): test DocumentAccess shutdown during active ession? } + + @Test + public void requireThatTimeoutWorks() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + AtomicReference<Response> response = new AtomicReference<>(); + AsyncSession session = access().createAsyncSession(new AsyncParameters()); + DocumentType type = access().getDocumentTypeManager().getDocumentType("music"); + Document doc1 = new Document(type, new DocumentId("id:ns:music::1")); + assertTrue(session.put(new DocumentPut(doc1), + DocumentOperationParameters.parameters() + .withResponseHandler(result -> { + response.set(result); + latch.countDown(); + }) + .withDeadline(Instant.now().minusSeconds(1))) + .isSuccess()); + assertTrue(latch.await(60, TimeUnit.SECONDS)); + assertNotNull(response.get()); + assertEquals(Response.Outcome.TIMEOUT, response.get().outcome()); + session.destroy(); + } + } |