diff options
Diffstat (limited to 'documentapi/src/main/java')
14 files changed, 439 insertions, 109 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java index 9f4ceaad37f..60f70a91338 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java @@ -7,6 +7,8 @@ import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentUpdate; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; + /** * <p>A session for asynchronous access to a document repository. * This class provides document repository writes and random access with high @@ -42,10 +44,11 @@ public interface AsyncSession extends Session { * If it was not a success, this method has no further effects.</p> * * @param document the Document to put + * @param priority the priority with which to send the operation * @return the synchronous result of this operation */ default Result put(Document document, DocumentProtocol.Priority priority) { - return put(new DocumentPut(document), priority); + return put(new DocumentPut(document), parameters().withPriority(priority)); } /** @@ -60,7 +63,7 @@ public interface AsyncSession extends Session { * @return the synchronous result of this operation */ default Result put(DocumentPut documentPut) { - return put(documentPut, DocumentProtocol.Priority.NORMAL_3); + return put(documentPut, parameters()); } /** @@ -72,10 +75,26 @@ public interface AsyncSession extends Session { * If it was not a success, this method has no further effects.</p> * * @param documentPut the DocumentPut to perform + * @param priority the priority with which to send the operation * @return the synchronous result of this operation */ - // TODO Vespa 8: Make this the one to implement. default Result put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + return put(documentPut, parameters().withPriority(priority)); + } + + /** + * <p>Puts a document, with optional conditions on the operation. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session. + * The response returned later will either be a success, or contain the document submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param documentPut the DocumentPut to perform + * @param parameters parameters for the operation + * @return the synchronous result of this operation + */ + default Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { return put(documentPut.getDocument()); } @@ -126,6 +145,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support retrieving */ default Result get(DocumentId id, DocumentProtocol.Priority priority) { + return get(id, parameters().withPriority(priority)); + } + + /** + * <p>Gets a document. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session. + * The response returned later will contain the requested document if it is a success. + * If it was not a success, this method has no further effects.</p> + * + * @param id the id of the document to get + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support retrieving + */ + default Result get(DocumentId id, DocumentOperationParameters parameters) { return get(id); } @@ -158,6 +194,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support removal */ default Result remove(DocumentId id, DocumentProtocol.Priority priority) { + return remove(id, parameters().withPriority(priority)); + } + + /** + * <p>Removes a document if it is present. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentIdResponse} objects to appear within the timeout time of this session. + * The response returned later will either be a success, or contain the document id submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param id the id of the document to remove + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support removal + */ + default Result remove(DocumentId id, DocumentOperationParameters parameters) { return remove(id); } @@ -189,6 +242,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support update */ default Result update(DocumentUpdate update, DocumentProtocol.Priority priority) { + return update(update, parameters().withPriority(priority)); + } + + /** + * <p>Updates a document. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentUpdateResponse} within the timeout time of this session. + * The returned response returned later will either be a success or contain the update submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param update the updates to perform + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support update + */ + default Result update(DocumentUpdate update, DocumentOperationParameters parameters) { return update(update); } @@ -199,4 +269,5 @@ public interface AsyncSession extends Session { */ double getCurrentWindowSize(); + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java index 308eafcd596..1aa5c4c0df0 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java @@ -5,6 +5,7 @@ import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentTypeManagerConfigurer; import com.yahoo.document.select.parser.ParseException; import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess; /** * <p>This is the starting point of the <b>document api</b>. This api provides @@ -27,9 +28,9 @@ import com.yahoo.config.subscription.ConfigSubscriber; * <p>This class is the factory for creating the four session types mentioned above.</p> * * <p>There may be multiple implementations of the document api classes. If - * default configuration is sufficient, use the {@link #createDefault} method to - * return a running document access. Note that there are running threads within - * an access object, so you must shut it down when done.</p> + * default configuration is sufficient, simply inject a {@code DocumentAccess} to + * obtain a running document access. If you instead create a concrete implementation, note that + * there are running threads within an access object, so you must shut it down when done.</p> * * <p>An implementation of the Document Api may support just a subset of the * access types defined in this interface. For example, some document @@ -55,13 +56,29 @@ public abstract class DocumentAccess { * while attempting to create such an object, this method will throw an * exception. * - * @deprecated Inject a DocumentManagerConfig and create a MessageBusDocumentAccess from this instead. + * @deprecated DocumentAccess may be injected in containers — otherwise use {@link #createForNonContainer()}. * * @return a running document access object with all default configuration */ @Deprecated(since = "7") public static DocumentAccess createDefault() { - return new com.yahoo.documentapi.messagebus.MessageBusDocumentAccess(); + return new MessageBusDocumentAccess(); + } + + + /** + * This is a convenience method to return a document access object when running + * outside of a Vespa application container, with all default parameter values. + * The client that calls this method is also responsible for shutting the object + * down when done. If an error occurred while attempting to create such an object, + * this method will throw an exception. + * This document access requires new config subscriptions to be set up, which should + * be avoided in application containers, but is suitable for, e.g., CLIs. + * + * @return a running document access object with all default configuration + */ + public static DocumentAccess createForNonContainer() { + return new MessageBusDocumentAccess(); } /** diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java index 34ab47571cf..e4a44fb88cd 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java @@ -2,6 +2,7 @@ package com.yahoo.documentapi; import com.yahoo.document.DocumentId; +import com.yahoo.messagebus.Trace; /** * The asynchronous response to a document remove operation. @@ -61,7 +62,19 @@ public class DocumentIdResponse extends Response { * @param outcome the outcome of the operation */ public DocumentIdResponse(long requestId, DocumentId documentId, String textMessage, Outcome outcome) { - super(requestId, textMessage, outcome); + this(requestId, documentId, textMessage, outcome, null); + } + + + /** + * Creates a response containing a textual message and/or a document id + * + * @param documentId the DocumentId to encapsulate in the Response + * @param textMessage the message to encapsulate in the Response + * @param outcome the outcome of the operation + */ + public DocumentIdResponse(long requestId, DocumentId documentId, String textMessage, Outcome outcome, Trace trace) { + super(requestId, textMessage, outcome, null); this.documentId = documentId; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java new file mode 100644 index 00000000000..3258c2f5b2c --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java @@ -0,0 +1,71 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi; + +import com.yahoo.document.fieldset.FieldSet; +import com.yahoo.document.fieldset.FieldSetRepo; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; + +import java.util.Optional; +import java.util.OptionalInt; + +import static java.util.Objects.requireNonNull; + +/** + * Optional parameters for a document operation. Immutable class. + * + * @author jonmv + */ +public class DocumentOperationParameters { + + private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1); + + private final DocumentProtocol.Priority priority; + private final String fieldSet; + private final String route; + private final int traceLevel; + + private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) { + this.priority = priority; + this.fieldSet = fieldSet; + this.route = route; + this.traceLevel = traceLevel; + } + + public static DocumentOperationParameters parameters() { + return empty; + } + + /** Sets the priority with which to perform an operation. */ + public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) { + return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel); + } + + /** Sets the field set used for retrieval. */ + public DocumentOperationParameters withFieldSet(FieldSet fieldSet) { + return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel); + } + + /** Sets the field set used for retrieval. */ + public DocumentOperationParameters withFieldSet(String fieldSet) { + return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel); + } + + /** Sets the route along which to send the operation. */ + public DocumentOperationParameters withRoute(String route) { + return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel); + } + + /** Sets the trace level for an operation. */ + public DocumentOperationParameters withTraceLevel(int traceLevel) { + if (traceLevel < 0 || traceLevel > 9) + throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)"); + + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel); + } + + public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); } + public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); } + public Optional<String> route() { return Optional.ofNullable(route); } + public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java index 172e5fd11c0..2a02b70d4fc 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java @@ -2,6 +2,7 @@ package com.yahoo.documentapi; import com.yahoo.document.Document; +import com.yahoo.messagebus.Trace; /** * The asynchronous response to a document put or get operation. @@ -25,30 +26,28 @@ public class DocumentResponse extends Response { * @param document the Document to encapsulate in the Response */ public DocumentResponse(long requestId, Document document) { - super(requestId); - this.document = document; + this(requestId, document, null); } /** - * Creates a response containing a textual message + * Creates a successful response containing a document * - * @param textMessage the message to encapsulate in the Response - * @param success true if the response represents a successful call + * @param document the Document to encapsulate in the Response */ - @Deprecated(since = "7") // TODO: Remove on Vespa 8 - public DocumentResponse(long requestId, String textMessage, boolean success) { - super(requestId, textMessage, success); - document = null; + public DocumentResponse(long requestId, Document document, Trace trace) { + this(requestId, document, null, document != null ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace); } /** * Creates a response containing a textual message * * @param textMessage the message to encapsulate in the Response - * @param outcome the outcome of this operation + * @param success true if the response represents a successful call */ - public DocumentResponse(long requestId, String textMessage, Outcome outcome) { - this(requestId, null, textMessage, outcome); + @Deprecated(since = "7") // TODO: Remove on Vespa 8 + public DocumentResponse(long requestId, String textMessage, boolean success) { + super(requestId, textMessage, success ? Outcome.NOT_FOUND : Outcome.ERROR); + document = null; } /** @@ -72,7 +71,19 @@ public class DocumentResponse extends Response { * @param outcome the outcome of this operation */ public DocumentResponse(long requestId, Document document, String textMessage, Outcome outcome) { - super(requestId, textMessage, outcome); + this(requestId, document, textMessage, outcome, null); + } + + + /** + * Creates a response containing a textual message and/or a document + * + * @param document the Document to encapsulate in the Response + * @param textMessage the message to encapsulate in the Response + * @param outcome the outcome of this operation + */ + public DocumentResponse(long requestId, Document document, String textMessage, Outcome outcome, Trace trace) { + super(requestId, textMessage, outcome, trace); this.document = document; } @@ -84,6 +95,12 @@ public class DocumentResponse extends Response { */ public Document getDocument() { return document; } + @Override + public boolean isSuccess() { + // TODO: is it right that Get operations are successful without a result, in this API? + return super.isSuccess() || outcome() == Outcome.NOT_FOUND; + } + public int hashCode() { return super.hashCode() + (document == null ? 0 : document.hashCode()); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java index 3294c216d96..d34873aeaa6 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java @@ -2,6 +2,7 @@ package com.yahoo.documentapi; import com.yahoo.document.DocumentUpdate; +import com.yahoo.messagebus.Trace; /** * The asynchronous response to a document update operation. @@ -71,7 +72,19 @@ public class DocumentUpdateResponse extends Response { * @param outcome the outcome of this operation */ public DocumentUpdateResponse(long requestId, DocumentUpdate documentUpdate, String textMessage, Outcome outcome) { - super(requestId, textMessage, outcome); + this(requestId, documentUpdate, textMessage, outcome, null); + } + + + /** + * Creates a response containing a textual message and/or a document update + * + * @param documentUpdate the DocumentUpdate to encapsulate in the Response + * @param textMessage the message to encapsulate in the Response + * @param outcome the outcome of this operation + */ + public DocumentUpdateResponse(long requestId, DocumentUpdate documentUpdate, String textMessage, Outcome outcome, Trace trace) { + super(requestId, textMessage, outcome, trace); this.documentUpdate = documentUpdate; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java index 502588a3d5f..2d3f2934890 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; +import com.yahoo.messagebus.Trace; + /** * This response is provided for successful document remove operations. Use the * wasFound() method to check whether or not the document was actually found. @@ -12,7 +14,11 @@ public class RemoveResponse extends Response { private final boolean wasFound; public RemoveResponse(long requestId, boolean wasFound) { - super(requestId); + this(requestId, wasFound, null); + } + + public RemoveResponse(long requestId, boolean wasFound, Trace trace) { + super(requestId, null, wasFound ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace); this.wasFound = wasFound; } @@ -21,6 +27,10 @@ public class RemoveResponse extends Response { } @Override + // TODO: fix this when/if NOT_FOUND is no longer a success. + public boolean isSuccess() { return super.isSuccess() || outcome() == Outcome.NOT_FOUND; } + + @Override public int hashCode() { return super.hashCode() + Boolean.valueOf(wasFound).hashCode(); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Response.java b/documentapi/src/main/java/com/yahoo/documentapi/Response.java index 4c95a648949..0a541a92c6a 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/Response.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/Response.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; +import com.yahoo.messagebus.Trace; + import java.util.Objects; import static com.yahoo.documentapi.Response.Outcome.ERROR; @@ -19,6 +21,7 @@ public class Response { private final long requestId; private final String textMessage; private final Outcome outcome; + private final Trace trace; /** Creates a successful response containing no information */ public Response(long requestId) { @@ -52,9 +55,20 @@ public class Response { * @param outcome the outcome of the operation */ public Response(long requestId, String textMessage, Outcome outcome) { + this(requestId, textMessage, outcome, null); + } + + /** + * Creates a response containing a textual message + * + * @param textMessage the message to encapsulate in the Response + * @param outcome the outcome of the operation + */ + public Response(long requestId, String textMessage, Outcome outcome, Trace trace) { this.requestId = requestId; this.textMessage = textMessage; this.outcome = outcome; + this.trace = trace; } /** @@ -76,6 +90,9 @@ public class Response { public long getRequestId() { return requestId; } + /** Returns the trace of this operation, or null if there is none. */ + public Trace getTrace() { return trace; } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java index 24fd47ed12c..1cee3249032 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java @@ -11,6 +11,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import java.time.Duration; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; + /** * A session for synchronous access to a document repository. This class * provides simple document access where throughput is not a concern. @@ -35,6 +37,18 @@ public interface SyncSession extends Session { * @param priority the priority with which to perform this operation */ default void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + put(documentPut, parameters().withPriority(priority)); + } + + /** + * Puts a document. When this method returns, the document is safely received. + * + * @param documentPut the DocumentPut operation + * @param parameters parameters for the operation + * + * @param documentPut the DocumentPut operation + */ + default void put(DocumentPut documentPut, DocumentOperationParameters parameters) { put(documentPut); } @@ -85,6 +99,20 @@ public interface SyncSession extends Session { Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority priority, Duration timeout); /** + * Gets a document with timeout. + * + * @param id the id of the document to get + * @param parameters parameters for the operation + * @param timeout timeout. If timeout is null, an unspecified default will be used + * @return the known document having this id, or null if there is no document having this id + * @throws UnsupportedOperationException thrown if this access does not support retrieving + * @throws DocumentAccessException on any messagebus error, including timeout ({@link com.yahoo.messagebus.ErrorCode#TIMEOUT}) + */ + default Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) { + return get(id, timeout); + } + + /** * Removes a document if it is present and condition is fulfilled. * * @param documentRemove document to delete @@ -103,6 +131,18 @@ public interface SyncSession extends Session { boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority); /** + * Removes a document if it is present. + * + * @param documentRemove document remove operation + * @param parameters parameters for the operation + * @return true if the document with this id was removed, false otherwise. + * @throws UnsupportedOperationException thrown if this access does not support removal + */ + default boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) { + return remove(documentRemove); + } + + /** * Updates a document. * * @param update the updates to perform @@ -127,4 +167,19 @@ public interface SyncSession extends Session { */ boolean update(DocumentUpdate update, DocumentProtocol.Priority priority); + /** + * Updates a document. + * + * @param update the updates to perform. + * @param parameters parameters for the operation + * @return false if the updates could not be applied as the document does not exist and + * {@link DocumentUpdate#setCreateIfNonExistent(boolean) create-if-non-existent} is not set. + * @throws DocumentAccessException on update error, including but not limited to: 1. timeouts, + * 2. the document exists but the {@link DocumentUpdate#setCondition(TestAndSetCondition) condition} + * is not met. + */ + default boolean update(DocumentUpdate update, DocumentOperationParameters parameters) { + return update(update); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java index 96bf58c1e64..54c338dc6c9 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi; +import com.yahoo.messagebus.Trace; + /** * This response is provided for successful document update operations. Use the * wasFound() method to check whether or not the document was actually found. @@ -12,7 +14,11 @@ public class UpdateResponse extends Response { private final boolean wasFound; public UpdateResponse(long requestId, boolean wasFound) { - super(requestId); + this(requestId, wasFound, null); + } + + public UpdateResponse(long requestId, boolean wasFound, Trace trace) { + super(requestId, null, wasFound ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace); this.wasFound = wasFound; } @@ -21,6 +27,10 @@ public class UpdateResponse extends Response { } @Override + // TODO: fix this when/if NOT_FOUND is no longer a success. + public boolean isSuccess() { return super.isSuccess() || outcome() == Outcome.NOT_FOUND; } + + @Override public int hashCode() { return super.hashCode() + Boolean.valueOf(wasFound).hashCode(); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 398675e594e..40f26a82a89 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -61,7 +61,7 @@ public class LocalAsyncSession implements AsyncSession { long req = getNextRequestId(); try { syncSession.put(documentPut, pri); - addResponse(new DocumentResponse(req)); + addResponse(new DocumentResponse(req, documentPut.getDocument())); } catch (Exception e) { addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR)); } @@ -85,7 +85,7 @@ public class LocalAsyncSession implements AsyncSession { try { addResponse(new DocumentResponse(req, syncSession.get(id))); } catch (Exception e) { - addResponse(new DocumentResponse(req, e.getMessage(), Response.Outcome.ERROR)); + addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR)); } return new Result(req); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index 7471d285db1..7a71089c180 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.fieldset.AllFields; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentIdResponse; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.DocumentUpdateResponse; import com.yahoo.documentapi.RemoveResponse; @@ -41,9 +42,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; import static com.yahoo.documentapi.Response.Outcome.CONDITION_FAILED; import static com.yahoo.documentapi.Response.Outcome.ERROR; import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND; +import static com.yahoo.documentapi.Response.Outcome.SUCCESS; /** * An access session which wraps a messagebus source session sending document messages. @@ -96,19 +99,24 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public Result put(Document document) { - return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3); + return put(new DocumentPut(document), parameters()); } @Override public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { + return put(documentPut, parameters().withPriority(pri)); + } + + @Override + public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { PutDocumentMessage msg = new PutDocumentMessage(documentPut); - msg.setPriority(pri); - return send(msg); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_3)); + return send(msg, parameters); } @Override public Result get(DocumentId id) { - return get(id, DocumentProtocol.Priority.NORMAL_1); + return get(id, parameters()); } @Override @@ -119,35 +127,52 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public Result get(DocumentId id, DocumentProtocol.Priority pri) { - GetDocumentMessage msg = new GetDocumentMessage(id, AllFields.NAME); - msg.setPriority(pri); - return send(msg); + return get(id, parameters().withPriority(pri)); + } + + @Override + public Result get(DocumentId id, DocumentOperationParameters parameters) { + GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME)); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_1)); + return send(msg, parameters); } @Override public Result remove(DocumentId id) { - return remove(id, DocumentProtocol.Priority.NORMAL_2); + return remove(id, parameters()); } @Override public Result remove(DocumentId id, DocumentProtocol.Priority pri) { + return remove(id, parameters().withPriority(pri)); + } + + @Override + public Result remove(DocumentId id, DocumentOperationParameters parameters) { RemoveDocumentMessage msg = new RemoveDocumentMessage(id); - msg.setPriority(pri); - return send(msg); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2)); + return send(msg, parameters); } @Override public Result update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_2); + return update(update, parameters()); } @Override public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { + return update(update, parameters().withPriority(pri)); + } + + @Override + public Result update(DocumentUpdate update, DocumentOperationParameters parameters) { UpdateDocumentMessage msg = new UpdateDocumentMessage(update); - msg.setPriority(pri); - return send(msg); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2)); + return send(msg, parameters); } + // TODO jonmv: Was this done to remedy get route no longer being possible to set through doc/v1 after default-get was added? + // TODO jonmv: If so, this is no longer needed with doc/v1.1 and later. private boolean mayOverrideWithGetOnlyRoute(Message msg) { // Only allow implicitly overriding the default Get route if the message is attempted sent // with the default route originally. Otherwise it's reasonable to assume that the caller @@ -156,19 +181,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { && ("default".equals(route) || "route:default".equals(route))); } - /** - * A convenience method for assigning the internal trace level and route string to a message before sending it - * through the internal mbus session object. - * - * @param msg the message to send. - * @return the document api result object. - */ - public Result send(Message msg) { + Result send(Message msg, DocumentOperationParameters parameters) { try { long reqId = requestId.incrementAndGet(); msg.setContext(reqId); - msg.getTrace().setLevel(traceLevel); - String toRoute = (mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); + msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel)); + // Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew! + String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); if (toRoute != null) { return toResult(reqId, session.send(msg, toRoute, true)); } else { @@ -179,6 +198,17 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } } + /** + * A convenience method for assigning the internal trace level and route string to a message before sending it + * through the internal mbus session object. + * + * @param msg the message to send. + * @return the document api result object. + */ + public Result send(Message msg) { + return send(msg, null); + } + @Override public Response getNext() { return responses.poll(); @@ -256,7 +286,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } private static Response toResponse(Reply reply) { - long reqId = (Long)reply.getContext(); + long reqId = (Long) reply.getContext(); return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId); } @@ -269,15 +299,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { String err = getErrorMessage(reply); switch (msg.getType()) { case DocumentProtocol.MESSAGE_PUTDOCUMENT: - return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome); + return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome, reply.getTrace()); case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: - return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome); + return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome, reply.getTrace()); case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: - return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome); + return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome, reply.getTrace()); case DocumentProtocol.MESSAGE_GETDOCUMENT: - return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome); + return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome, reply.getTrace()); default: - return new Response(reqId, err, outcome); + return new Response(reqId, err, outcome, reply.getTrace()); } } @@ -289,26 +319,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { if (getDoc != null) { getDoc.setLastModified(docReply.getLastModified()); } - return new DocumentResponse(reqId, getDoc); + return new DocumentResponse(reqId, getDoc, reply.getTrace()); case DocumentProtocol.REPLY_REMOVEDOCUMENT: - return new RemoveResponse(reqId, ((RemoveDocumentReply)reply).wasFound()); + return new RemoveResponse(reqId, ((RemoveDocumentReply)reply).wasFound(), reply.getTrace()); case DocumentProtocol.REPLY_UPDATEDOCUMENT: - return new UpdateResponse(reqId, ((UpdateDocumentReply)reply).wasFound()); + return new UpdateResponse(reqId, ((UpdateDocumentReply)reply).wasFound(), reply.getTrace()); case DocumentProtocol.REPLY_PUTDOCUMENT: - break; - default: - return new Response(reqId); - } - Message msg = reply.getMessage(); - switch (msg.getType()) { - case DocumentProtocol.MESSAGE_PUTDOCUMENT: - return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument()); - case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: - return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId()); - case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: - return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate()); + return new DocumentResponse(reqId, ((PutDocumentMessage)reply.getMessage()).getDocumentPut().getDocument(), reply.getTrace()); default: - return new Response(reqId); + return new Response(reqId, null, SUCCESS, reply.getTrace()); } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java index ff891dbb298..9832529c157 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java @@ -20,6 +20,8 @@ import com.yahoo.messagebus.network.local.LocalNetwork; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; /** * This class implements the {@link DocumentAccess} interface using message bus for communication. @@ -29,6 +31,8 @@ import java.util.concurrent.ScheduledExecutorService; */ public class MessageBusDocumentAccess extends DocumentAccess { + private static final Logger log = Logger.getLogger(MessageBusDocumentAccess.class.getName()); + private final NetworkMessageBus bus; private final MessageBusParams params; @@ -60,7 +64,12 @@ public class MessageBusDocumentAccess extends DocumentAccess { bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams)); } else { - bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId()); + if (params.getRPCNetworkParams().getSlobroksConfig() != null && mbusParams.getMessageBusConfig() != null) + bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams()); + else { + log.log(Level.FINE, () -> "Setting up self-subscription to config because explicit config was missing; try to avoid this in containers"); + bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId()); + } } } catch (Exception e) { @@ -92,7 +101,7 @@ public class MessageBusDocumentAccess extends DocumentAccess { @Override public MessageBusVisitorSession createVisitorSession(VisitorParameters params) throws ParseException, IllegalArgumentException { MessageBusVisitorSession session = MessageBusVisitorSession.createForMessageBus( - bus.getMessageBus(), scheduledExecutorService, params); + messageBus(), scheduledExecutorService, params); session.start(); return session; } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index a9621950b88..c7ab8a23e11 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.document.fieldset.AllFields; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.DocumentAccessException; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.Response; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SyncParameters; @@ -28,6 +29,8 @@ import com.yahoo.messagebus.ReplyHandler; import java.time.Duration; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; + /** * An implementation of the SyncSession interface running over message bus. * @@ -84,10 +87,14 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re * @return The reply received. */ public Reply syncSend(Message msg) { - return syncSend(msg, defaultTimeout); + return syncSend(msg, parameters()); + } + + private Reply syncSend(Message msg, DocumentOperationParameters parameters) { + return syncSend(msg, defaultTimeout, parameters()); } - private Reply syncSend(Message msg, Duration timeout) { + private Reply syncSend(Message msg, Duration timeout, DocumentOperationParameters parameters) { if (timeout != null) { msg.setTimeRemaining(timeout.toMillis()); } @@ -97,7 +104,7 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re msg.pushHandler(this); // store monitor Result result = null; while (result == null || result.type() == Result.ResultType.TRANSIENT_ERROR) { - result = session.send(msg); + result = session.send(msg, parameters); if (result != null && result.isSuccess()) { break; } @@ -114,37 +121,40 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public void put(DocumentPut documentPut) { - put(documentPut, DocumentProtocol.Priority.NORMAL_3); + put(documentPut, parameters()); } @Override public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { - PutDocumentMessage msg = new PutDocumentMessage(documentPut); - msg.setPriority(priority); - syncSendPutDocumentMessage(msg); + put(documentPut, parameters().withPriority(priority)); } @Override - public Document get(DocumentId id) { - return get(id, null); + public void put(DocumentPut documentPut, DocumentOperationParameters parameters) { + PutDocumentMessage msg = new PutDocumentMessage(documentPut); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_3)); + Reply reply = syncSend(msg, parameters); + if (reply.hasErrors()) { + throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes()); + } } @Override - public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) { - return get(id, fieldSet, pri, null); + public Document get(DocumentId id, Duration timeout) { + return get(id, parameters(), timeout); } @Override - public Document get(DocumentId id, Duration timeout) { - return get(id, AllFields.NAME, DocumentProtocol.Priority.NORMAL_1, timeout); + public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) { + return get(id, parameters().withFieldSet(fieldSet).withPriority(pri), timeout); } @Override - public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) { - GetDocumentMessage msg = new GetDocumentMessage(id, fieldSet); - msg.setPriority(pri); + public Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) { + GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME)); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_1)); - Reply reply = syncSend(msg, timeout != null ? timeout : defaultTimeout); + Reply reply = syncSend(msg, timeout != null ? timeout : defaultTimeout, parameters); if (reply.hasErrors()) { throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply)); } @@ -161,21 +171,20 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public boolean remove(DocumentRemove documentRemove) { - RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); - msg.setCondition(documentRemove.getCondition()); - return remove(msg); + return remove(documentRemove, parameters()); } @Override public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority pri) { - RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); - msg.setPriority(pri); - msg.setCondition(documentRemove.getCondition()); - return remove(msg); + return remove(documentRemove, parameters().withPriority(pri)); } - private boolean remove(RemoveDocumentMessage msg) { - Reply reply = syncSend(msg); + @Override + public boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) { + RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2)); + msg.setCondition(documentRemove.getCondition()); + Reply reply = syncSend(msg, parameters); if (reply.hasErrors()) { throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply)); } @@ -187,14 +196,19 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public boolean update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_2); + return update(update, parameters()); } @Override public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) { + return update(update, parameters().withPriority(pri)); + } + + @Override + public boolean update(DocumentUpdate update, DocumentOperationParameters parameters) { UpdateDocumentMessage msg = new UpdateDocumentMessage(update); - msg.setPriority(pri); - Reply reply = syncSend(msg); + msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2)); + Reply reply = syncSend(msg, parameters); if (reply.hasErrors()) { throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes()); } @@ -243,10 +257,4 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re } } - private void syncSendPutDocumentMessage(PutDocumentMessage putDocumentMessage) { - Reply reply = syncSend(putDocumentMessage); - if (reply.hasErrors()) { - throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes()); - } - } } |