diff options
7 files changed, 274 insertions, 36 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java index 9f4ceaad37f..721faf281f7 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/AsyncSession.java @@ -7,6 +7,8 @@ import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentUpdate; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; + /** * <p>A session for asynchronous access to a document repository. * This class provides document repository writes and random access with high @@ -42,10 +44,11 @@ public interface AsyncSession extends Session { * If it was not a success, this method has no further effects.</p> * * @param document the Document to put + * @param priority the priority with which to send the operation * @return the synchronous result of this operation */ default Result put(Document document, DocumentProtocol.Priority priority) { - return put(new DocumentPut(document), priority); + return put(new DocumentPut(document), parameters().withPriority(priority)); } /** @@ -60,7 +63,7 @@ public interface AsyncSession extends Session { * @return the synchronous result of this operation */ default Result put(DocumentPut documentPut) { - return put(documentPut, DocumentProtocol.Priority.NORMAL_3); + return put(documentPut, parameters()); } /** @@ -72,10 +75,26 @@ public interface AsyncSession extends Session { * If it was not a success, this method has no further effects.</p> * * @param documentPut the DocumentPut to perform + * @param priority the priority with which to send the operation * @return the synchronous result of this operation */ - // TODO Vespa 8: Make this the one to implement. default Result put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + return put(documentPut, parameters().withPriority(priority)); + } + + /** + * <p>Puts a document, with optional conditions on the operation. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session. + * The response returned later will either be a success, or contain the document submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param documentPut the DocumentPut to perform + * @param parameters parameters for the operation + * @return the synchronous result of this operation + */ + default Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { return put(documentPut.getDocument()); } @@ -126,6 +145,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support retrieving */ default Result get(DocumentId id, DocumentProtocol.Priority priority) { + return get(id, parameters().withPriority(priority)); + } + + /** + * <p>Gets a document. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentResponse} objects to appear within the timeout time of this session. + * The response returned later will contain the requested document if it is a success. + * If it was not a success, this method has no further effects.</p> + * + * @param id the id of the document to get + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support retrieving + */ + default Result get(DocumentId id, DocumentOperationParameters parameters) { return get(id); } @@ -158,6 +194,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support removal */ default Result remove(DocumentId id, DocumentProtocol.Priority priority) { + return remove(id, parameters().withPriority(priority)); + } + + /** + * <p>Removes a document if it is present. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentIdResponse} objects to apprear within the timeout time of this session. + * The response returned later will either be a success, or contain the document id submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param id the id of the document to remove + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support removal + */ + default Result remove(DocumentId id, DocumentOperationParameters parameters) { return remove(id); } @@ -189,6 +242,23 @@ public interface AsyncSession extends Session { * @throws UnsupportedOperationException if this access implementation does not support update */ default Result update(DocumentUpdate update, DocumentProtocol.Priority priority) { + return update(update, parameters().withPriority(priority)); + } + + /** + * <p>Updates a document. This method returns immediately.</p> + * + * <p>If this result is a success, this + * call will cause one or more {@link DocumentUpdateResponse} within the timeout time of this session. + * The returned response returned later will either be a success or contain the update submitted here. + * If it was not a success, this method has no further effects.</p> + * + * @param update the updates to perform + * @param parameters parameters for the operation + * @return the synchronous result of this operation + * @throws UnsupportedOperationException if this access implementation does not support update + */ + default Result update(DocumentUpdate update, DocumentOperationParameters parameters) { return update(update); } @@ -199,4 +269,5 @@ public interface AsyncSession extends Session { */ double getCurrentWindowSize(); + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java new file mode 100644 index 00000000000..cfb134ffa4b --- /dev/null +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java @@ -0,0 +1,66 @@ +package com.yahoo.documentapi; + +import com.yahoo.document.fieldset.FieldSet; +import com.yahoo.document.fieldset.FieldSetRepo; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; + +import java.util.Optional; +import java.util.OptionalInt; + +import static java.util.Objects.requireNonNull; + +/** Optional parameters for a document operation. Immutable class. */ +public class DocumentOperationParameters { + + private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1); + + private final DocumentProtocol.Priority priority; + private final String fieldSet; + private final String route; + private final int traceLevel; + + private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) { + this.priority = priority; + this.fieldSet = fieldSet; + this.route = route; + this.traceLevel = traceLevel; + } + + public static DocumentOperationParameters parameters() { + return empty; + } + + /** Sets the priority with which to perform an operation. */ + public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) { + return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel); + } + + /** Sets the field set used for retrieval. */ + public DocumentOperationParameters withFieldSet(FieldSet fieldSet) { + return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel); + } + + /** Sets the field set used for retrieval. */ + public DocumentOperationParameters withFieldSet(String fieldSet) { + return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel); + } + + /** Sets the route along which to send the operation. */ + public DocumentOperationParameters withRoute(String route) { + return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel); + } + + /** Sets the trace level for an operation. */ + public DocumentOperationParameters withTraceLevel(int traceLevel) { + if (traceLevel < 0 || traceLevel > 9) + throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)"); + + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel); + } + + public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); } + public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); } + public Optional<String> route() { return Optional.ofNullable(route); } + public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); } + +} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java index 24fd47ed12c..1cee3249032 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/SyncSession.java @@ -11,6 +11,8 @@ import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import java.time.Duration; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; + /** * A session for synchronous access to a document repository. This class * provides simple document access where throughput is not a concern. @@ -35,6 +37,18 @@ public interface SyncSession extends Session { * @param priority the priority with which to perform this operation */ default void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { + put(documentPut, parameters().withPriority(priority)); + } + + /** + * Puts a document. When this method returns, the document is safely received. + * + * @param documentPut the DocumentPut operation + * @param parameters parameters for the operation + * + * @param documentPut the DocumentPut operation + */ + default void put(DocumentPut documentPut, DocumentOperationParameters parameters) { put(documentPut); } @@ -85,6 +99,20 @@ public interface SyncSession extends Session { Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority priority, Duration timeout); /** + * Gets a document with timeout. + * + * @param id the id of the document to get + * @param parameters parameters for the operation + * @param timeout timeout. If timeout is null, an unspecified default will be used + * @return the known document having this id, or null if there is no document having this id + * @throws UnsupportedOperationException thrown if this access does not support retrieving + * @throws DocumentAccessException on any messagebus error, including timeout ({@link com.yahoo.messagebus.ErrorCode#TIMEOUT}) + */ + default Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) { + return get(id, timeout); + } + + /** * Removes a document if it is present and condition is fulfilled. * * @param documentRemove document to delete @@ -103,6 +131,18 @@ public interface SyncSession extends Session { boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority priority); /** + * Removes a document if it is present. + * + * @param documentRemove document remove operation + * @param parameters parameters for the operation + * @return true if the document with this id was removed, false otherwise. + * @throws UnsupportedOperationException thrown if this access does not support removal + */ + default boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) { + return remove(documentRemove); + } + + /** * Updates a document. * * @param update the updates to perform @@ -127,4 +167,19 @@ public interface SyncSession extends Session { */ boolean update(DocumentUpdate update, DocumentProtocol.Priority priority); + /** + * Updates a document. + * + * @param update the updates to perform. + * @param parameters parameters for the operation + * @return false if the updates could not be applied as the document does not exist and + * {@link DocumentUpdate#setCreateIfNonExistent(boolean) create-if-non-existent} is not set. + * @throws DocumentAccessException on update error, including but not limited to: 1. timeouts, + * 2. the document exists but the {@link DocumentUpdate#setCondition(TestAndSetCondition) condition} + * is not met. + */ + default boolean update(DocumentUpdate update, DocumentOperationParameters parameters) { + return update(update); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index 7471d285db1..98e7aec7b11 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.fieldset.AllFields; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentIdResponse; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.DocumentUpdateResponse; import com.yahoo.documentapi.RemoveResponse; @@ -16,6 +17,7 @@ import com.yahoo.documentapi.Response; import com.yahoo.documentapi.ResponseHandler; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.UpdateResponse; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.GetDocumentReply; @@ -33,6 +35,7 @@ import com.yahoo.messagebus.ReplyHandler; import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; +import com.yahoo.messagebus.routing.Route; import java.util.Queue; import java.util.concurrent.BlockingQueue; @@ -41,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; import static com.yahoo.documentapi.Response.Outcome.CONDITION_FAILED; import static com.yahoo.documentapi.Response.Outcome.ERROR; import static com.yahoo.documentapi.Response.Outcome.NOT_FOUND; @@ -96,19 +100,24 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public Result put(Document document) { - return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3); + return put(new DocumentPut(document), parameters()); } @Override public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { + return put(documentPut, parameters().withPriority(pri)); + } + + @Override + public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { PutDocumentMessage msg = new PutDocumentMessage(documentPut); - msg.setPriority(pri); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_3); return send(msg); } @Override public Result get(DocumentId id) { - return get(id, DocumentProtocol.Priority.NORMAL_1); + return get(id, parameters()); } @Override @@ -119,35 +128,52 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { @Override public Result get(DocumentId id, DocumentProtocol.Priority pri) { - GetDocumentMessage msg = new GetDocumentMessage(id, AllFields.NAME); - msg.setPriority(pri); + return get(id, parameters().withPriority(pri)); + } + + @Override + public Result get(DocumentId id, DocumentOperationParameters parameters) { + GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME)); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_1); return send(msg); } @Override public Result remove(DocumentId id) { - return remove(id, DocumentProtocol.Priority.NORMAL_2); + return remove(id, parameters()); } @Override public Result remove(DocumentId id, DocumentProtocol.Priority pri) { + return remove(id, parameters().withPriority(pri)); + } + + @Override + public Result remove(DocumentId id, DocumentOperationParameters parameters) { RemoveDocumentMessage msg = new RemoveDocumentMessage(id); - msg.setPriority(pri); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_2); return send(msg); } @Override public Result update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_2); + return update(update, parameters()); } @Override public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { + return update(update, parameters().withPriority(pri)); + } + + @Override + public Result update(DocumentUpdate update, DocumentOperationParameters parameters) { UpdateDocumentMessage msg = new UpdateDocumentMessage(update); - msg.setPriority(pri); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_2); return send(msg); } + // TODO jonmv: Was this done to remedy get route no longer being possible to set through doc/v1 after default-get was added? + // TODO jonmv: If so, this is no longer needed with doc/v1.1 and later. private boolean mayOverrideWithGetOnlyRoute(Message msg) { // Only allow implicitly overriding the default Get route if the message is attempted sent // with the default route originally. Otherwise it's reasonable to assume that the caller @@ -168,7 +194,9 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { long reqId = requestId.incrementAndGet(); msg.setContext(reqId); msg.getTrace().setLevel(traceLevel); - String toRoute = (mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); + // Use message route if set, or session route if non-default, or finally, defaults for get and non-get, if set. Phew! + String toRoute = msg.getRoute() != null ? msg.getRoute().toString() + : (mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); if (toRoute != null) { return toResult(reqId, session.send(msg, toRoute, true)); } else { @@ -336,4 +364,11 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } } + static void setParameters(DocumentMessage message, DocumentOperationParameters parameters, + DocumentProtocol.Priority defaultPriority) { + message.setPriority(parameters.priority().orElse(defaultPriority)); + parameters.route().map(Route::parse).ifPresent(message::setRoute); + parameters.traceLevel().ifPresent(message.getTrace()::setLevel); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java index a9621950b88..7bf0976f249 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusSyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.document.fieldset.AllFields; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.DocumentAccessException; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.Response; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SyncParameters; @@ -25,9 +26,13 @@ import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBus; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.routing.Route; import java.time.Duration; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; +import static com.yahoo.documentapi.messagebus.MessageBusAsyncSession.setParameters; + /** * An implementation of the SyncSession interface running over message bus. * @@ -114,35 +119,35 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public void put(DocumentPut documentPut) { - put(documentPut, DocumentProtocol.Priority.NORMAL_3); + put(documentPut, parameters()); } @Override public void put(DocumentPut documentPut, DocumentProtocol.Priority priority) { - PutDocumentMessage msg = new PutDocumentMessage(documentPut); - msg.setPriority(priority); - syncSendPutDocumentMessage(msg); + put(documentPut, parameters().withPriority(priority)); } @Override - public Document get(DocumentId id) { - return get(id, null); + public void put(DocumentPut documentPut, DocumentOperationParameters parameters) { + PutDocumentMessage msg = new PutDocumentMessage(documentPut); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_3); + syncSendPutDocumentMessage(msg); } @Override - public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri) { - return get(id, fieldSet, pri, null); + public Document get(DocumentId id, Duration timeout) { + return get(id, parameters(), timeout); } @Override - public Document get(DocumentId id, Duration timeout) { - return get(id, AllFields.NAME, DocumentProtocol.Priority.NORMAL_1, timeout); + public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) { + return get(id, parameters().withFieldSet(fieldSet).withPriority(pri), timeout); } @Override - public Document get(DocumentId id, String fieldSet, DocumentProtocol.Priority pri, Duration timeout) { - GetDocumentMessage msg = new GetDocumentMessage(id, fieldSet); - msg.setPriority(pri); + public Document get(DocumentId id, DocumentOperationParameters parameters, Duration timeout) { + GetDocumentMessage msg = new GetDocumentMessage(id, parameters.fieldSet().orElse(AllFields.NAME)); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_1); Reply reply = syncSend(msg, timeout != null ? timeout : defaultTimeout); if (reply.hasErrors()) { @@ -161,15 +166,18 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public boolean remove(DocumentRemove documentRemove) { - RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); - msg.setCondition(documentRemove.getCondition()); - return remove(msg); + return remove(documentRemove, parameters()); } @Override public boolean remove(DocumentRemove documentRemove, DocumentProtocol.Priority pri) { + return remove(documentRemove, parameters().withPriority(pri)); + } + + @Override + public boolean remove(DocumentRemove documentRemove, DocumentOperationParameters parameters) { RemoveDocumentMessage msg = new RemoveDocumentMessage(documentRemove.getId()); - msg.setPriority(pri); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_2); msg.setCondition(documentRemove.getCondition()); return remove(msg); } @@ -187,13 +195,18 @@ public class MessageBusSyncSession implements MessageBusSession, SyncSession, Re @Override public boolean update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_2); + return update(update, parameters()); } @Override public boolean update(DocumentUpdate update, DocumentProtocol.Priority pri) { + return update(update, parameters().withPriority(pri)); + } + + @Override + public boolean update(DocumentUpdate update, DocumentOperationParameters parameters) { UpdateDocumentMessage msg = new UpdateDocumentMessage(update); - msg.setPriority(pri); + setParameters(msg, parameters, DocumentProtocol.Priority.NORMAL_2); Reply reply = syncSend(msg); if (reply.hasErrors()) { throw new DocumentAccessException(MessageBusAsyncSession.getErrorMessage(reply), reply.getErrorCodes()); diff --git a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingSpec.java b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingSpec.java index 032f13a008a..b5cbfd8224e 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingSpec.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/routing/RoutingSpec.java @@ -195,7 +195,6 @@ public class RoutingSpec { return ret.toString(); } - // Overrides Object. @Override public String toString() { StringBuilder ret = new StringBuilder(); @@ -203,7 +202,6 @@ public class RoutingSpec { return ret.toString(); } - // Overrides Object. @Override public boolean equals(Object obj) { if (!(obj instanceof RoutingSpec)) { @@ -218,7 +216,7 @@ public class RoutingSpec { @Override public int hashCode() { - int result = tables != null ? tables.hashCode() : 0; + int result = tables.hashCode(); result = 31 * result + (verify ? 1 : 0); return result; } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java index 85b714b3b42..9bd2d66fa91 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/XMLFeeder.java @@ -12,7 +12,7 @@ import java.io.InputStream; * access point. * * @author Thomas Gundersen - * @author steinar + * @author Steinar Knutsen */ public class XMLFeeder extends Feeder { public XMLFeeder(DocumentTypeManager docMan, SimpleFeedAccess sender, InputStream stream) { |