summaryrefslogtreecommitdiffstats
path: root/documentapi/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/main/java')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java77
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java27
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java71
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java43
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java12
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/Response.java17
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/SyncSession.java55
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java12
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java4
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java109
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java13
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java78
14 files changed, 439 insertions, 109 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java
index 9f4ceaad37f..60f70a91338 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java
@@ -7,6 +7,8 @@ import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+
/**
* <p>A session for asynchronous access to a document repository.
* This class provides document repository writes and random access with high
@@ -42,10 +44,11 @@ public interface AsyncSession extends Session {
* If it was not a success, this method has no further effects.</p>
*
* @param document the Document to put
+ * @param priority the priority with which to send the operation
* @return the synchronous result of this operation
*/
default Result put(Document document, DocumentProtocol.Priority priority) {
- return put(new DocumentPut(document), priority);
+ return put(new DocumentPut(document), parameters().withPriority(priority));
}
/**
@@ -60,7 +63,7 @@ public interface AsyncSession extends Session {
* @return the synchronous result of this operation
*/
default Result put(DocumentPut documentPut) {
- return put(documentPut, DocumentProtocol.Priority.NORMAL_3);
+ return put(documentPut, parameters());
}
/**
@@ -72,10 +75,26 @@ public interface AsyncSession extends Session {
* If it was not a success, this method has no further effects.</p>
*
* @param documentPut the DocumentPut to perform
+ * @param priority the priority with which to send the operation
* @return the synchronous result of this operation
*/
- // TODO Vespa 8: Make this the one to implement.
default Result put(DocumentPut documentPut, DocumentProtocol.Priority priority) {
+ return put(documentPut, parameters().withPriority(priority));
+ }
+
+ /**
+ * <p>Puts a document, with optional conditions on the operation. This method returns immediately.</p>
+ *
+ * <p>If this result is a success, this
+ * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session.
+ * The response returned later will either be a success, or contain the document submitted here.
+ * If it was not a success, this method has no further effects.</p>
+ *
+ * @param documentPut the DocumentPut to perform
+ * @param parameters parameters for the operation
+ * @return the synchronous result of this operation
+ */
+ default Result put(DocumentPut documentPut, DocumentOperationParameters parameters) {
return put(documentPut.getDocument());
}
@@ -126,6 +145,23 @@ public interface AsyncSession extends Session {
* @throws UnsupportedOperationException if this access implementation does not support retrieving
*/
default Result get(DocumentId id, DocumentProtocol.Priority priority) {
+ return get(id, parameters().withPriority(priority));
+ }
+
+ /**
+ * <p>Gets a document. This method returns immediately.</p>
+ *
+ * <p>If this result is a success, this
+ * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session.
+ * The response returned later will contain the requested document if it is a success.
+ * If it was not a success, this method has no further effects.</p>
+ *
+ * @param id the id of the document to get
+ * @param parameters parameters for the operation
+ * @return the synchronous result of this operation
+ * @throws UnsupportedOperationException if this access implementation does not support retrieving
+ */
+ default Result get(DocumentId id, DocumentOperationParameters parameters) {
return get(id);
}
@@ -158,6 +194,23 @@ public interface AsyncSession extends Session {
* @throws UnsupportedOperationException if this access implementation does not support removal
*/
default Result remove(DocumentId id, DocumentProtocol.Priority priority) {
+ return remove(id, parameters().withPriority(priority));
+ }
+
+ /**
+ * <p>Removes a document if it is present. This method returns immediately.</p>
+ *
+ * <p>If this result is a success, this
+ * call will cause one or more {@link DocumentIdResponse} objects to appear within the timeout time of this session.
+ * The response returned later will either be a success, or contain the document id submitted here.
+ * If it was not a success, this method has no further effects.</p>
+ *
+ * @param id the id of the document to remove
+ * @param parameters parameters for the operation
+ * @return the synchronous result of this operation
+ * @throws UnsupportedOperationException if this access implementation does not support removal
+ */
+ default Result remove(DocumentId id, DocumentOperationParameters parameters) {
return remove(id);
}
@@ -189,6 +242,23 @@ public interface AsyncSession extends Session {
* @throws UnsupportedOperationException if this access implementation does not support update
*/
default Result update(DocumentUpdate update, DocumentProtocol.Priority priority) {
+ return update(update, parameters().withPriority(priority));
+ }
+
+ /**
+ * <p>Updates a document. This method returns immediately.</p>
+ *
+ * <p>If this result is a success, this
+ * call will cause one or more {@link DocumentUpdateResponse} within the timeout time of this session.
+ * The returned response returned later will either be a success or contain the update submitted here.
+ * If it was not a success, this method has no further effects.</p>
+ *
+ * @param update the updates to perform
+ * @param parameters parameters for the operation
+ * @return the synchronous result of this operation
+ * @throws UnsupportedOperationException if this access implementation does not support update
+ */
+ default Result update(DocumentUpdate update, DocumentOperationParameters parameters) {
return update(update);
}
@@ -199,4 +269,5 @@ public interface AsyncSession extends Session {
*/
double getCurrentWindowSize();
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java
index 308eafcd596..1aa5c4c0df0 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentAccess.java
@@ -5,6 +5,7 @@ import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentTypeManagerConfigurer;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.documentapi.messagebus.MessageBusDocumentAccess;
/**
* <p>This is the starting point of the <b>document api</b>. This api provides
@@ -27,9 +28,9 @@ import com.yahoo.config.subscription.ConfigSubscriber;
* <p>This class is the factory for creating the four session types mentioned above.</p>
*
* <p>There may be multiple implementations of the document api classes. If
- * default configuration is sufficient, use the {@link #createDefault} method to
- * return a running document access. Note that there are running threads within
- * an access object, so you must shut it down when done.</p>
+ * default configuration is sufficient, simply inject a {@code DocumentAccess} to
+ * obtain a running document access. If you instead create a concrete implementation, note that
+ * there are running threads within an access object, so you must shut it down when done.</p>
*
* <p>An implementation of the Document Api may support just a subset of the
* access types defined in this interface. For example, some document
@@ -55,13 +56,29 @@ public abstract class DocumentAccess {
* while attempting to create such an object, this method will throw an
* exception.
*
- * @deprecated Inject a DocumentManagerConfig and create a MessageBusDocumentAccess from this instead.
+ * @deprecated DocumentAccess may be injected in containers — otherwise use {@link #createForNonContainer()}.
*
* @return a running document access object with all default configuration
*/
@Deprecated(since = "7")
public static DocumentAccess createDefault() {
- return new com.yahoo.documentapi.messagebus.MessageBusDocumentAccess();
+ return new MessageBusDocumentAccess();
+ }
+
+
+ /**
+ * This is a convenience method to return a document access object when running
+ * outside of a Vespa application container, with all default parameter values.
+ * The client that calls this method is also responsible for shutting the object
+ * down when done. If an error occurred while attempting to create such an object,
+ * this method will throw an exception.
+ * This document access requires new config subscriptions to be set up, which should
+ * be avoided in application containers, but is suitable for, e.g., CLIs.
+ *
+ * @return a running document access object with all default configuration
+ */
+ public static DocumentAccess createForNonContainer() {
+ return new MessageBusDocumentAccess();
}
/**
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java
index 34ab47571cf..e4a44fb88cd 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentIdResponse.java
@@ -2,6 +2,7 @@
package com.yahoo.documentapi;
import com.yahoo.document.DocumentId;
+import com.yahoo.messagebus.Trace;
/**
* The asynchronous response to a document remove operation.
@@ -61,7 +62,19 @@ public class DocumentIdResponse extends Response {
* @param outcome the outcome of the operation
*/
public DocumentIdResponse(long requestId, DocumentId documentId, String textMessage, Outcome outcome) {
- super(requestId, textMessage, outcome);
+ this(requestId, documentId, textMessage, outcome, null);
+ }
+
+
+ /**
+ * Creates a response containing a textual message and/or a document id
+ *
+ * @param documentId the DocumentId to encapsulate in the Response
+ * @param textMessage the message to encapsulate in the Response
+ * @param outcome the outcome of the operation
+ */
+ public DocumentIdResponse(long requestId, DocumentId documentId, String textMessage, Outcome outcome, Trace trace) {
+ super(requestId, textMessage, outcome, null);
this.documentId = documentId;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
new file mode 100644
index 00000000000..3258c2f5b2c
--- /dev/null
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
@@ -0,0 +1,71 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi;
+
+import com.yahoo.document.fieldset.FieldSet;
+import com.yahoo.document.fieldset.FieldSetRepo;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Optional parameters for a document operation. Immutable class.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationParameters {
+
+ private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1);
+
+ private final DocumentProtocol.Priority priority;
+ private final String fieldSet;
+ private final String route;
+ private final int traceLevel;
+
+ private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) {
+ this.priority = priority;
+ this.fieldSet = fieldSet;
+ this.route = route;
+ this.traceLevel = traceLevel;
+ }
+
+ public static DocumentOperationParameters parameters() {
+ return empty;
+ }
+
+ /** Sets the priority with which to perform an operation. */
+ public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) {
+ return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel);
+ }
+
+ /** Sets the field set used for retrieval. */
+ public DocumentOperationParameters withFieldSet(FieldSet fieldSet) {
+ return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel);
+ }
+
+ /** Sets the field set used for retrieval. */
+ public DocumentOperationParameters withFieldSet(String fieldSet) {
+ return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel);
+ }
+
+ /** Sets the route along which to send the operation. */
+ public DocumentOperationParameters withRoute(String route) {
+ return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel);
+ }
+
+ /** Sets the trace level for an operation. */
+ public DocumentOperationParameters withTraceLevel(int traceLevel) {
+ if (traceLevel < 0 || traceLevel > 9)
+ throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)");
+
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel);
+ }
+
+ public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); }
+ public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); }
+ public Optional<String> route() { return Optional.ofNullable(route); }
+ public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); }
+
+}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java
index 172e5fd11c0..2a02b70d4fc 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentResponse.java
@@ -2,6 +2,7 @@
package com.yahoo.documentapi;
import com.yahoo.document.Document;
+import com.yahoo.messagebus.Trace;
/**
* The asynchronous response to a document put or get operation.
@@ -25,30 +26,28 @@ public class DocumentResponse extends Response {
* @param document the Document to encapsulate in the Response
*/
public DocumentResponse(long requestId, Document document) {
- super(requestId);
- this.document = document;
+ this(requestId, document, null);
}
/**
- * Creates a response containing a textual message
+ * Creates a successful response containing a document
*
- * @param textMessage the message to encapsulate in the Response
- * @param success true if the response represents a successful call
+ * @param document the Document to encapsulate in the Response
*/
- @Deprecated(since = "7") // TODO: Remove on Vespa 8
- public DocumentResponse(long requestId, String textMessage, boolean success) {
- super(requestId, textMessage, success);
- document = null;
+ public DocumentResponse(long requestId, Document document, Trace trace) {
+ this(requestId, document, null, document != null ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace);
}
/**
* Creates a response containing a textual message
*
* @param textMessage the message to encapsulate in the Response
- * @param outcome the outcome of this operation
+ * @param success true if the response represents a successful call
*/
- public DocumentResponse(long requestId, String textMessage, Outcome outcome) {
- this(requestId, null, textMessage, outcome);
+ @Deprecated(since = "7") // TODO: Remove on Vespa 8
+ public DocumentResponse(long requestId, String textMessage, boolean success) {
+ super(requestId, textMessage, success ? Outcome.NOT_FOUND : Outcome.ERROR);
+ document = null;
}
/**
@@ -72,7 +71,19 @@ public class DocumentResponse extends Response {
* @param outcome the outcome of this operation
*/
public DocumentResponse(long requestId, Document document, String textMessage, Outcome outcome) {
- super(requestId, textMessage, outcome);
+ this(requestId, document, textMessage, outcome, null);
+ }
+
+
+ /**
+ * Creates a response containing a textual message and/or a document
+ *
+ * @param document the Document to encapsulate in the Response
+ * @param textMessage the message to encapsulate in the Response
+ * @param outcome the outcome of this operation
+ */
+ public DocumentResponse(long requestId, Document document, String textMessage, Outcome outcome, Trace trace) {
+ super(requestId, textMessage, outcome, trace);
this.document = document;
}
@@ -84,6 +95,12 @@ public class DocumentResponse extends Response {
*/
public Document getDocument() { return document; }
+ @Override
+ public boolean isSuccess() {
+ // TODO: is it right that Get operations are successful without a result, in this API?
+ return super.isSuccess() || outcome() == Outcome.NOT_FOUND;
+ }
+
public int hashCode() {
return super.hashCode() + (document == null ? 0 : document.hashCode());
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java
index 3294c216d96..d34873aeaa6 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentUpdateResponse.java
@@ -2,6 +2,7 @@
package com.yahoo.documentapi;
import com.yahoo.document.DocumentUpdate;
+import com.yahoo.messagebus.Trace;
/**
* The asynchronous response to a document update operation.
@@ -71,7 +72,19 @@ public class DocumentUpdateResponse extends Response {
* @param outcome the outcome of this operation
*/
public DocumentUpdateResponse(long requestId, DocumentUpdate documentUpdate, String textMessage, Outcome outcome) {
- super(requestId, textMessage, outcome);
+ this(requestId, documentUpdate, textMessage, outcome, null);
+ }
+
+
+ /**
+ * Creates a response containing a textual message and/or a document update
+ *
+ * @param documentUpdate the DocumentUpdate to encapsulate in the Response
+ * @param textMessage the message to encapsulate in the Response
+ * @param outcome the outcome of this operation
+ */
+ public DocumentUpdateResponse(long requestId, DocumentUpdate documentUpdate, String textMessage, Outcome outcome, Trace trace) {
+ super(requestId, textMessage, outcome, trace);
this.documentUpdate = documentUpdate;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java
index 502588a3d5f..2d3f2934890 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/RemoveResponse.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi;
+import com.yahoo.messagebus.Trace;
+
/**
* This response is provided for successful document remove operations. Use the
* wasFound() method to check whether or not the document was actually found.
@@ -12,7 +14,11 @@ public class RemoveResponse extends Response {
private final boolean wasFound;
public RemoveResponse(long requestId, boolean wasFound) {
- super(requestId);
+ this(requestId, wasFound, null);
+ }
+
+ public RemoveResponse(long requestId, boolean wasFound, Trace trace) {
+ super(requestId, null, wasFound ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace);
this.wasFound = wasFound;
}
@@ -21,6 +27,10 @@ public class RemoveResponse extends Response {
}
@Override
+ // TODO: fix this when/if NOT_FOUND is no longer a success.
+ public boolean isSuccess() { return super.isSuccess() || outcome() == Outcome.NOT_FOUND; }
+
+ @Override
public int hashCode() {
return super.hashCode() + Boolean.valueOf(wasFound).hashCode();
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/Response.java b/documentapi/src/main/java/com/yahoo/documentapi/Response.java
index 4c95a648949..0a541a92c6a 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/Response.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/Response.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi;
+import com.yahoo.messagebus.Trace;
+
import java.util.Objects;
import static com.yahoo.documentapi.Response.Outcome.ERROR;
@@ -19,6 +21,7 @@ public class Response {
private final long requestId;
private final String textMessage;
private final Outcome outcome;
+ private final Trace trace;
/** Creates a successful response containing no information */
public Response(long requestId) {
@@ -52,9 +55,20 @@ public class Response {
* @param outcome the outcome of the operation
*/
public Response(long requestId, String textMessage, Outcome outcome) {
+ this(requestId, textMessage, outcome, null);
+ }
+
+ /**
+ * Creates a response containing a textual message
+ *
+ * @param textMessage the message to encapsulate in the Response
+ * @param outcome the outcome of the operation
+ */
+ public Response(long requestId, String textMessage, Outcome outcome, Trace trace) {
this.requestId = requestId;
this.textMessage = textMessage;
this.outcome = outcome;
+ this.trace = trace;
}
/**
@@ -76,6 +90,9 @@ public class Response {
public long getRequestId() { return requestId; }
+ /** Returns the trace of this operation, or null if there is none. */
+ public Trace getTrace() { return trace; }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java
index 24fd47ed12c..1cee3249032 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java
@@ -11,6 +11,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import java.time.Duration;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+
/**
* A session for synchronous access to a document repository. This class
* provides simple document access where throughput is not a concern.
@@ -35,6 +37,18 @@ public interface SyncSession extends Session {
* @param priority the priority with which to perform this operation
*/
default void put(DocumentPut documentPut, DocumentProtocol.Priority priority) {
+ put(documentPut, parameters().withPriority(priority));
+ }
+
+ /**
+ * Puts a document. When this method returns, the document is safely received.
+ *
+ * @param documentPut the DocumentPut operation
+ * @param parameters parameters for the operation
+ *
+ * @param documentPut the DocumentPut operation
+ */
+ default void put(DocumentPut documentPut, DocumentOperationParameters parameters) {
put(documentPut);
}
@@ -85,6 +99,20 @@ public interface SyncSession extends Session {
Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority priority, Duration timeout);
/**
+ * Gets a document with timeout.
+ *
+ * @param id the id of the document to get
+ * @param parameters parameters for the operation
+ * @param timeout timeout. If timeout is null, an unspecified default will be used
+ * @return the known document having this id, or null if there is no document having this id
+ * @throws UnsupportedOperationException thrown if this access does not support retrieving
+ * @throws DocumentAccessException on any messagebus error, including timeout ({@link com.yahoo.messagebus.ErrorCode#TIMEOUT})
+ */
+ default Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) {
+ return get(id, timeout);
+ }
+
+ /**
* Removes a document if it is present and condition is fulfilled.
*
* @param documentRemove document to delete
@@ -103,6 +131,18 @@ public interface SyncSession extends Session {
boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority);
/**
+ * Removes a document if it is present.
+ *
+ * @param documentRemove document remove operation
+ * @param parameters parameters for the operation
+ * @return true if the document with this id was removed, false otherwise.
+ * @throws UnsupportedOperationException thrown if this access does not support removal
+ */
+ default boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) {
+ return remove(documentRemove);
+ }
+
+ /**
* Updates a document.
*
* @param update the updates to perform
@@ -127,4 +167,19 @@ public interface SyncSession extends Session {
*/
boolean update(DocumentUpdate update, DocumentProtocol.Priority priority);
+ /**
+ * Updates a document.
+ *
+ * @param update the updates to perform.
+ * @param parameters parameters for the operation
+ * @return false if the updates could not be applied as the document does not exist and
+ * {@link DocumentUpdate#setCreateIfNonExistent(boolean) create-if-non-existent} is not set.
+ * @throws DocumentAccessException on update error, including but not limited to: 1. timeouts,
+ * 2. the document exists but the {@link DocumentUpdate#setCondition(TestAndSetCondition) condition}
+ * is not met.
+ */
+ default boolean update(DocumentUpdate update, DocumentOperationParameters parameters) {
+ return update(update);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java b/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java
index 96bf58c1e64..54c338dc6c9 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/UpdateResponse.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi;
+import com.yahoo.messagebus.Trace;
+
/**
* This response is provided for successful document update operations. Use the
* wasFound() method to check whether or not the document was actually found.
@@ -12,7 +14,11 @@ public class UpdateResponse extends Response {
private final boolean wasFound;
public UpdateResponse(long requestId, boolean wasFound) {
- super(requestId);
+ this(requestId, wasFound, null);
+ }
+
+ public UpdateResponse(long requestId, boolean wasFound, Trace trace) {
+ super(requestId, null, wasFound ? Outcome.SUCCESS : Outcome.NOT_FOUND, trace);
this.wasFound = wasFound;
}
@@ -21,6 +27,10 @@ public class UpdateResponse extends Response {
}
@Override
+ // TODO: fix this when/if NOT_FOUND is no longer a success.
+ public boolean isSuccess() { return super.isSuccess() || outcome() == Outcome.NOT_FOUND; }
+
+ @Override
public int hashCode() {
return super.hashCode() + Boolean.valueOf(wasFound).hashCode();
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 398675e594e..40f26a82a89 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -61,7 +61,7 @@ public class LocalAsyncSession implements AsyncSession {
long req = getNextRequestId();
try {
syncSession.put(documentPut, pri);
- addResponse(new DocumentResponse(req));
+ addResponse(new DocumentResponse(req, documentPut.getDocument()));
} catch (Exception e) {
addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
}
@@ -85,7 +85,7 @@ public class LocalAsyncSession implements AsyncSession {
try {
addResponse(new DocumentResponse(req, syncSession.get(id)));
} catch (Exception e) {
- addResponse(new DocumentResponse(req, e.getMessage(), Response.Outcome.ERROR));
+ addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
}
return new Result(req);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
index 7471d285db1..7a71089c180 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
@@ -9,6 +9,7 @@ import com.yahoo.document.fieldset.AllFields;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentIdResponse;
+import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
import com.yahoo.documentapi.DocumentUpdateResponse;
import com.yahoo.documentapi.RemoveResponse;
@@ -41,9 +42,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
import static com.yahoo.documentapi.Response.Outcome.CONDITION_FAILED;
import static com.yahoo.documentapi.Response.Outcome.ERROR;
import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND;
+import static com.yahoo.documentapi.Response.Outcome.SUCCESS;
/**
* An access session which wraps a messagebus source session sending document messages.
@@ -96,19 +99,24 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
@Override
public Result put(Document document) {
- return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3);
+ return put(new DocumentPut(document), parameters());
}
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
+ return put(documentPut, parameters().withPriority(pri));
+ }
+
+ @Override
+ public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) {
PutDocumentMessage msg = new PutDocumentMessage(documentPut);
- msg.setPriority(pri);
- return send(msg);
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_3));
+ return send(msg, parameters);
}
@Override
public Result get(DocumentId id) {
- return get(id, DocumentProtocol.Priority.NORMAL_1);
+ return get(id, parameters());
}
@Override
@@ -119,35 +127,52 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- GetDocumentMessage msg = new GetDocumentMessage(id, AllFields.NAME);
- msg.setPriority(pri);
- return send(msg);
+ return get(id, parameters().withPriority(pri));
+ }
+
+ @Override
+ public Result get(DocumentId id, DocumentOperationParameters parameters) {
+ GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME));
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_1));
+ return send(msg, parameters);
}
@Override
public Result remove(DocumentId id) {
- return remove(id, DocumentProtocol.Priority.NORMAL_2);
+ return remove(id, parameters());
}
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
+ return remove(id, parameters().withPriority(pri));
+ }
+
+ @Override
+ public Result remove(DocumentId id, DocumentOperationParameters parameters) {
RemoveDocumentMessage msg = new RemoveDocumentMessage(id);
- msg.setPriority(pri);
- return send(msg);
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2));
+ return send(msg, parameters);
}
@Override
public Result update(DocumentUpdate update) {
- return update(update, DocumentProtocol.Priority.NORMAL_2);
+ return update(update, parameters());
}
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
+ return update(update, parameters().withPriority(pri));
+ }
+
+ @Override
+ public Result update(DocumentUpdate update, DocumentOperationParameters parameters) {
UpdateDocumentMessage msg = new UpdateDocumentMessage(update);
- msg.setPriority(pri);
- return send(msg);
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2));
+ return send(msg, parameters);
}
+ // TODO jonmv: Was this done to remedy get route no longer being possible to set through doc/v1 after default-get was added?
+ // TODO jonmv: If so, this is no longer needed with doc/v1.1 and later.
private boolean mayOverrideWithGetOnlyRoute(Message msg) {
// Only allow implicitly overriding the default Get route if the message is attempted sent
// with the default route originally. Otherwise it's reasonable to assume that the caller
@@ -156,19 +181,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
&& ("default".equals(route) || "route:default".equals(route)));
}
- /**
- * A convenience method for assigning the internal trace level and route string to a message before sending it
- * through the internal mbus session object.
- *
- * @param msg the message to send.
- * @return the document api result object.
- */
- public Result send(Message msg) {
+ Result send(Message msg, DocumentOperationParameters parameters) {
try {
long reqId = requestId.incrementAndGet();
msg.setContext(reqId);
- msg.getTrace().setLevel(traceLevel);
- String toRoute = (mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route);
+ msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel));
+ // Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew!
+ String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route);
if (toRoute != null) {
return toResult(reqId, session.send(msg, toRoute, true));
} else {
@@ -179,6 +198,17 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
}
}
+ /**
+ * A convenience method for assigning the internal trace level and route string to a message before sending it
+ * through the internal mbus session object.
+ *
+ * @param msg the message to send.
+ * @return the document api result object.
+ */
+ public Result send(Message msg) {
+ return send(msg, null);
+ }
+
@Override
public Response getNext() {
return responses.poll();
@@ -256,7 +286,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
}
private static Response toResponse(Reply reply) {
- long reqId = (Long)reply.getContext();
+ long reqId = (Long) reply.getContext();
return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId);
}
@@ -269,15 +299,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
String err = getErrorMessage(reply);
switch (msg.getType()) {
case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome);
+ return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument(), err, outcome, reply.getTrace());
case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome);
+ return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate(), err, outcome, reply.getTrace());
case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome);
+ return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId(), err, outcome, reply.getTrace());
case DocumentProtocol.MESSAGE_GETDOCUMENT:
- return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome);
+ return new DocumentIdResponse(reqId, ((GetDocumentMessage)msg).getDocumentId(), err, outcome, reply.getTrace());
default:
- return new Response(reqId, err, outcome);
+ return new Response(reqId, err, outcome, reply.getTrace());
}
}
@@ -289,26 +319,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
if (getDoc != null) {
getDoc.setLastModified(docReply.getLastModified());
}
- return new DocumentResponse(reqId, getDoc);
+ return new DocumentResponse(reqId, getDoc, reply.getTrace());
case DocumentProtocol.REPLY_REMOVEDOCUMENT:
- return new RemoveResponse(reqId, ((RemoveDocumentReply)reply).wasFound());
+ return new RemoveResponse(reqId, ((RemoveDocumentReply)reply).wasFound(), reply.getTrace());
case DocumentProtocol.REPLY_UPDATEDOCUMENT:
- return new UpdateResponse(reqId, ((UpdateDocumentReply)reply).wasFound());
+ return new UpdateResponse(reqId, ((UpdateDocumentReply)reply).wasFound(), reply.getTrace());
case DocumentProtocol.REPLY_PUTDOCUMENT:
- break;
- default:
- return new Response(reqId);
- }
- Message msg = reply.getMessage();
- switch (msg.getType()) {
- case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- return new DocumentResponse(reqId, ((PutDocumentMessage)msg).getDocumentPut().getDocument());
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- return new DocumentIdResponse(reqId, ((RemoveDocumentMessage)msg).getDocumentId());
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- return new DocumentUpdateResponse(reqId, ((UpdateDocumentMessage)msg).getDocumentUpdate());
+ return new DocumentResponse(reqId, ((PutDocumentMessage)reply.getMessage()).getDocumentPut().getDocument(), reply.getTrace());
default:
- return new Response(reqId);
+ return new Response(reqId, null, SUCCESS, reply.getTrace());
}
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
index ff891dbb298..9832529c157 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusDocumentAccess.java
@@ -20,6 +20,8 @@ import com.yahoo.messagebus.network.local.LocalNetwork;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* This class implements the {@link DocumentAccess} interface using message bus for communication.
@@ -29,6 +31,8 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class MessageBusDocumentAccess extends DocumentAccess {
+ private static final Logger log = Logger.getLogger(MessageBusDocumentAccess.class.getName());
+
private final NetworkMessageBus bus;
private final MessageBusParams params;
@@ -60,7 +64,12 @@ public class MessageBusDocumentAccess extends DocumentAccess {
bus = new NetworkMessageBus(network, new MessageBus(network, mbusParams));
}
else {
- bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
+ if (params.getRPCNetworkParams().getSlobroksConfig() != null && mbusParams.getMessageBusConfig() != null)
+ bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams());
+ else {
+ log.log(Level.FINE, () -> "Setting up self-subscription to config because explicit config was missing; try to avoid this in containers");
+ bus = new RPCMessageBus(mbusParams, params.getRPCNetworkParams(), params.getRoutingConfigId());
+ }
}
}
catch (Exception e) {
@@ -92,7 +101,7 @@ public class MessageBusDocumentAccess extends DocumentAccess {
@Override
public MessageBusVisitorSession createVisitorSession(VisitorParameters params) throws ParseException, IllegalArgumentException {
MessageBusVisitorSession session = MessageBusVisitorSession.createForMessageBus(
- bus.getMessageBus(), scheduledExecutorService, params);
+ messageBus(), scheduledExecutorService, params);
session.start();
return session;
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
index a9621950b88..c7ab8a23e11 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java
@@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.fieldset.AllFields;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.DocumentAccessException;
+import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.Response;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.SyncParameters;
@@ -28,6 +29,8 @@ import com.yahoo.messagebus.ReplyHandler;
import java.time.Duration;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+
/**
* An implementation of the SyncSession interface running over message bus.
*
@@ -84,10 +87,14 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
* @return The reply received.
*/
public Reply syncSend(Message msg) {
- return syncSend(msg, defaultTimeout);
+ return syncSend(msg, parameters());
+ }
+
+ private Reply syncSend(Message msg, DocumentOperationParameters parameters) {
+ return syncSend(msg, defaultTimeout, parameters());
}
- private Reply syncSend(Message msg, Duration timeout) {
+ private Reply syncSend(Message msg, Duration timeout, DocumentOperationParameters parameters) {
if (timeout != null) {
msg.setTimeRemaining(timeout.toMillis());
}
@@ -97,7 +104,7 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
msg.pushHandler(this); // store monitor
Result result = null;
while (result == null || result.type() == Result.ResultType.TRANSIENT_ERROR) {
- result = session.send(msg);
+ result = session.send(msg, parameters);
if (result != null && result.isSuccess()) {
break;
}
@@ -114,37 +121,40 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
@Override
public void put(DocumentPut documentPut) {
- put(documentPut, DocumentProtocol.Priority.NORMAL_3);
+ put(documentPut, parameters());
}
@Override
public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) {
- PutDocumentMessage msg = new PutDocumentMessage(documentPut);
- msg.setPriority(priority);
- syncSendPutDocumentMessage(msg);
+ put(documentPut, parameters().withPriority(priority));
}
@Override
- public Document get(DocumentId id) {
- return get(id, null);
+ public void put(DocumentPut documentPut, DocumentOperationParameters parameters) {
+ PutDocumentMessage msg = new PutDocumentMessage(documentPut);
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_3));
+ Reply reply = syncSend(msg, parameters);
+ if (reply.hasErrors()) {
+ throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes());
+ }
}
@Override
- public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) {
- return get(id, fieldSet, pri, null);
+ public Document get(DocumentId id, Duration timeout) {
+ return get(id, parameters(), timeout);
}
@Override
- public Document get(DocumentId id, Duration timeout) {
- return get(id, AllFields.NAME, DocumentProtocol.Priority.NORMAL_1, timeout);
+ public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) {
+ return get(id, parameters().withFieldSet(fieldSet).withPriority(pri), timeout);
}
@Override
- public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) {
- GetDocumentMessage msg = new GetDocumentMessage(id, fieldSet);
- msg.setPriority(pri);
+ public Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) {
+ GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME));
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_1));
- Reply reply = syncSend(msg, timeout != null ? timeout : defaultTimeout);
+ Reply reply = syncSend(msg, timeout != null ? timeout : defaultTimeout, parameters);
if (reply.hasErrors()) {
throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply));
}
@@ -161,21 +171,20 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
@Override
public boolean remove(DocumentRemove documentRemove) {
- RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId());
- msg.setCondition(documentRemove.getCondition());
- return remove(msg);
+ return remove(documentRemove, parameters());
}
@Override
public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority pri) {
- RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId());
- msg.setPriority(pri);
- msg.setCondition(documentRemove.getCondition());
- return remove(msg);
+ return remove(documentRemove, parameters().withPriority(pri));
}
- private boolean remove(RemoveDocumentMessage msg) {
- Reply reply = syncSend(msg);
+ @Override
+ public boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) {
+ RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId());
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2));
+ msg.setCondition(documentRemove.getCondition());
+ Reply reply = syncSend(msg, parameters);
if (reply.hasErrors()) {
throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply));
}
@@ -187,14 +196,19 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
@Override
public boolean update(DocumentUpdate update) {
- return update(update, DocumentProtocol.Priority.NORMAL_2);
+ return update(update, parameters());
}
@Override
public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) {
+ return update(update, parameters().withPriority(pri));
+ }
+
+ @Override
+ public boolean update(DocumentUpdate update, DocumentOperationParameters parameters) {
UpdateDocumentMessage msg = new UpdateDocumentMessage(update);
- msg.setPriority(pri);
- Reply reply = syncSend(msg);
+ msg.setPriority(parameters.priority().orElse(DocumentProtocol.Priority.NORMAL_2));
+ Reply reply = syncSend(msg, parameters);
if (reply.hasErrors()) {
throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes());
}
@@ -243,10 +257,4 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re
}
}
- private void syncSendPutDocumentMessage(PutDocumentMessage putDocumentMessage) {
- Reply reply = syncSend(putDocumentMessage);
- if (reply.hasErrors()) {
- throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes());
- }
- }
}