diff options
29 files changed, 2792 insertions, 165 deletions
diff --git a/application/abi-spec.json b/application/abi-spec.json index d3e7ab6daef..690facffae7 100644 --- a/application/abi-spec.json +++ b/application/abi-spec.json @@ -271,6 +271,17 @@ "public static final enum com.yahoo.application.Networking disable" ] }, + "com.yahoo.application.container.DocumentAccesses": { + "superClass": "java.lang.Object", + "interfaces": [], + "attributes": [ + "public" + ], + "methods": [ + "public static com.yahoo.documentapi.local.LocalDocumentAccess createFromSchemas(java.lang.String)" + ], + "fields": [] + }, "com.yahoo.application.container.DocumentProcessing": { "superClass": "java.lang.Object", "interfaces": [], diff --git a/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java b/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java new file mode 100644 index 00000000000..c0edad2baa6 --- /dev/null +++ b/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java @@ -0,0 +1,44 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.application.container; + +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.DocumentAccessParams; +import com.yahoo.documentapi.local.LocalDocumentAccess; +import com.yahoo.searchdefinition.derived.Deriver; + +import java.io.File; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; + +/** + * Utility for working with a {@link LocalDocumentAccess} for unit testing components which require a {@link DocumentAccess}. + * + * @author jonmv + */ +public class DocumentAccesses { + + private DocumentAccesses() { } + + /** + * Reads the {@code .sd} files in the given directory, and returns a {@link LocalDocumentAccess} with these document types. + * <br> + * Example usage: + * <pre> + * LocalDocumentAccess access = DocumentAccesses.ofSchemas("src/main/application/schemas"); + * </pre> + */ + public static LocalDocumentAccess createFromSchemas(String schemaDirectory) { + File[] schemasFiles = new File(schemaDirectory).listFiles(name -> name.toString().endsWith(".sd")); + if (schemasFiles == null) + throw new IllegalArgumentException(schemaDirectory + " is not a directory"); + if (schemasFiles.length == 0) + throw new IllegalArgumentException("No schema files found under " + schemaDirectory); + DocumentmanagerConfig config = Deriver.getDocumentManagerConfig(Stream.of(schemasFiles) + .map(File::toString) + .collect(toList())).build(); + return new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(config)); + } + +} diff --git a/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java new file mode 100644 index 00000000000..cfb48339dbe --- /dev/null +++ b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java @@ -0,0 +1,54 @@ +package com.yahoo.container.core; + +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.application.BindingMatch; +import com.yahoo.jdisc.application.UriPattern; +import com.yahoo.jdisc.handler.ResponseHandler; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Common HTTP request handler metrics code. + * + * @author jonmv + */ +public class HandlerMetricContextUtil { + + public static void onHandle(Request request, Metric metric, Class<?> handlerClass) { + metric.add("handled.requests", 1, contextFor(request, metric, handlerClass)); + } + + public static void onHandled(Request request, Metric metric, Class<?> handlerClass) { + metric.set("handled.latency", request.timeElapsed(TimeUnit.MILLISECONDS), contextFor(request, metric, handlerClass)); + } + + public static Metric.Context contextFor(Request request, Metric metric, Class<?> handlerClass) { + return contextFor(request, Map.of(), metric, handlerClass); + } + + public static Metric.Context contextFor(Request request, Map<String, String> extraDimensions, Metric metric, Class<?> handlerClass) { + BindingMatch<?> match = request.getBindingMatch(); + if (match == null) return null; + UriPattern matched = match.matched(); + if (matched == null) return null; + String name = matched.toString(); + String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null; + + Map<String, String> dimensions = new HashMap<>(); + dimensions.put("handler", name); + if (endpoint != null) { + dimensions.put("endpoint", endpoint); + } + URI uri = request.getUri(); + dimensions.put("scheme", uri.getScheme()); + dimensions.put("port", Integer.toString(uri.getPort())); + dimensions.put("handler-name", handlerClass.getName()); + dimensions.putAll(extraDimensions); + return metric.createContext(dimensions); + } + +} diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java b/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java index 8243ad07760..a3e264c16ee 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java @@ -127,7 +127,7 @@ public class RequestHandlerTestDriver implements AutoCloseable { public String read() { ByteBuffer nextBuffer = content.read(); if (nextBuffer == null) return null; // end of transmission - return Charset.forName("utf-8").decode(nextBuffer).toString(); + return StandardCharsets.UTF_8.decode(nextBuffer).toString(); } /** Returns the number of bytes available in the handler right now */ diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java index 9387e03e11b..323935e2a26 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java @@ -6,8 +6,6 @@ import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.application.BindingMatch; -import com.yahoo.jdisc.application.UriPattern; import com.yahoo.jdisc.handler.AbstractRequestHandler; import com.yahoo.jdisc.handler.BufferedContentChannel; import com.yahoo.jdisc.handler.ContentChannel; @@ -15,10 +13,9 @@ import com.yahoo.jdisc.handler.OverloadException; import com.yahoo.jdisc.handler.ReadableContentChannel; import com.yahoo.jdisc.handler.ResponseDispatch; import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.container.core.HandlerMetricContextUtil; -import java.net.URI; import java.time.Duration; -import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; @@ -79,29 +76,9 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { } Metric.Context contextFor(Request request, Map<String, String> extraDimensions) { - BindingMatch<?> match = request.getBindingMatch(); - if (match == null) return null; - UriPattern matched = match.matched(); - if (matched == null) return null; - String name = matched.toString(); - String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null; - - Map<String, String> dimensions = new HashMap<>(); - dimensions.put("handler", name); - if (endpoint != null) { - dimensions.put("endpoint", endpoint); - } - URI uri = request.getUri(); - dimensions.put("scheme", uri.getScheme()); - dimensions.put("port", Integer.toString(uri.getPort())); - String handlerClassName = getClass().getName(); - dimensions.put("handler-name", handlerClassName); - dimensions.putAll(extraDimensions); - return this.metric.createContext(dimensions); + return HandlerMetricContextUtil.contextFor(request, extraDimensions, metric, getClass()); } - private Metric.Context contextFor(Request request) { return contextFor(request, Map.of()); } - /** * Handles a request by assigning a worker thread to it. * @@ -109,7 +86,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { */ @Override public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { - metric.add("handled.requests", 1, contextFor(request)); + HandlerMetricContextUtil.onHandle(request, metric, getClass()); if (request.getTimeout(TimeUnit.SECONDS) == null) { Duration timeout = getTimeout(); if (timeout != null) { @@ -212,8 +189,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { public ContentChannel handleResponse(Response response) { if ( tryHasResponded()) throw new IllegalStateException("Response already handled"); ContentChannel cc = responseHandler.handleResponse(response); - long millis = request.timeElapsed(TimeUnit.MILLISECONDS); - metric.set("handled.latency", millis, contextFor(request)); + HandlerMetricContextUtil.onHandled(request, metric, getClass()); return cc; } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index f3b4d1f6457..407256b19c8 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -668,10 +668,9 @@ public class ControllerTest { assertEquals(routingMethods, Set.of(RoutingMethod.shared, RoutingMethod.sharedLayer4)); // Deployment has stored application meta. - assertArrayEquals(applicationPackage.metaDataZip(), - tester.controllerTester().serviceRegistry().applicationStore() - .getMeta(new DeploymentId(context.instanceId(), zone)) - .get(tester.clock().instant())); + assertNotNull(tester.controllerTester().serviceRegistry().applicationStore() + .getMeta(new DeploymentId(context.instanceId(), zone)) + .get(tester.clock().instant())); // Meta data tombstone placed on delete tester.clock().advance(Duration.ofSeconds(1)); diff --git a/document/abi-spec.json b/document/abi-spec.json index e53cf09f07e..c9191aa2fdb 100644 --- a/document/abi-spec.json +++ b/document/abi-spec.json @@ -399,6 +399,8 @@ "public com.yahoo.document.DocumentId getId()", "public void <init>(com.yahoo.document.DocumentPut)", "public void <init>(com.yahoo.document.DocumentPut, com.yahoo.document.Document)", + "public boolean equals(java.lang.Object)", + "public int hashCode()", "public java.lang.String toString()" ], "fields": [] @@ -1929,6 +1931,8 @@ "public java.lang.String getSelection()", "public boolean isPresent()", "public static com.yahoo.document.TestAndSetCondition fromConditionString(java.util.Optional)", + "public boolean equals(java.lang.Object)", + "public int hashCode()", "public java.lang.String toString()" ], "fields": [ diff --git a/document/src/main/java/com/yahoo/document/DocumentPut.java b/document/src/main/java/com/yahoo/document/DocumentPut.java index c5ce2e7e181..e24388cd65f 100644 --- a/document/src/main/java/com/yahoo/document/DocumentPut.java +++ b/document/src/main/java/com/yahoo/document/DocumentPut.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document; +import java.util.Objects; + /** * @author Vegard Sjonfjell */ @@ -47,6 +49,20 @@ public class DocumentPut extends DocumentOperation { } @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocumentPut that = (DocumentPut) o; + return document.equals(that.document) && + getCondition().equals(that.getCondition()); + } + + @Override + public int hashCode() { + return Objects.hash(document, getCondition()); + } + + @Override public String toString() { return "put of document " + getId(); } diff --git a/document/src/main/java/com/yahoo/document/TestAndSetCondition.java b/document/src/main/java/com/yahoo/document/TestAndSetCondition.java index 6a189fc2969..a582807e38c 100644 --- a/document/src/main/java/com/yahoo/document/TestAndSetCondition.java +++ b/document/src/main/java/com/yahoo/document/TestAndSetCondition.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document; +import java.util.Objects; import java.util.Optional; /** @@ -43,6 +44,19 @@ public class TestAndSetCondition { } @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TestAndSetCondition that = (TestAndSetCondition) o; + return conditionStr.equals(that.conditionStr); + } + + @Override + public int hashCode() { + return Objects.hash(conditionStr); + } + + @Override public String toString() { StringBuilder string = new StringBuilder(); string.append("condition '"); diff --git a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java index e20845bfa54..88353139b0f 100644 --- a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java +++ b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java @@ -129,7 +129,7 @@ public class TokenBuffer { add(t, tokens.getCurrentName(), tokens.getText()); } catch (IOException e) { // TODO something sane - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } } @@ -138,7 +138,7 @@ public class TokenBuffer { return tokens.nextValue(); } catch (IOException e) { // TODO something sane - throw new RuntimeException(e); + throw new IllegalArgumentException(e); } } diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index f5f2a7c1845..a204da107f0 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -167,10 +167,15 @@ "public com.yahoo.documentapi.DocumentOperationParameters withFieldSet(java.lang.String)", "public com.yahoo.documentapi.DocumentOperationParameters withRoute(java.lang.String)", "public com.yahoo.documentapi.DocumentOperationParameters withTraceLevel(int)", + "public com.yahoo.documentapi.DocumentOperationParameters withResponseHandler(com.yahoo.documentapi.ResponseHandler)", "public java.util.Optional priority()", "public java.util.Optional fieldSet()", "public java.util.Optional route()", - "public java.util.OptionalInt traceLevel()" + "public java.util.OptionalInt traceLevel()", + "public java.util.Optional responseHandler()", + "public boolean equals(java.lang.Object)", + "public int hashCode()", + "public java.lang.String toString()" ], "fields": [] }, @@ -969,17 +974,17 @@ "public void <init>(com.yahoo.documentapi.AsyncParameters, com.yahoo.documentapi.local.LocalDocumentAccess)", "public double getCurrentWindowSize()", "public com.yahoo.documentapi.Result put(com.yahoo.document.Document)", - "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId)", - "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, boolean, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", - "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId)", - "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate)", - "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", + "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.DocumentOperationParameters)", "public com.yahoo.documentapi.Response getNext()", "public com.yahoo.documentapi.Response getNext(int)", - "public void destroy()" + "public void destroy()", + "public void setResultType(com.yahoo.documentapi.Result$ResultType)" ], "fields": [] }, @@ -991,12 +996,16 @@ ], "methods": [ "public void <init>(com.yahoo.documentapi.DocumentAccessParams)", - "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)", - "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", - "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", + "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)", + "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", + "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", "public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)", "public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)", - "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)" + "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)", + "public void setPhaser(java.util.concurrent.Phaser)", + "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", + "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", + "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)" ], "fields": [] }, diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java index 3258c2f5b2c..1d934680586 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java @@ -5,6 +5,7 @@ import com.yahoo.document.fieldset.FieldSet; import com.yahoo.document.fieldset.FieldSetRepo; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -17,18 +18,21 @@ import static java.util.Objects.requireNonNull; */ public class DocumentOperationParameters { - private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1); + private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null); private final DocumentProtocol.Priority priority; private final String fieldSet; private final String route; private final int traceLevel; + private final ResponseHandler responseHandler; - private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) { + private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, + int traceLevel, ResponseHandler responseHandler) { this.priority = priority; this.fieldSet = fieldSet; this.route = route; this.traceLevel = traceLevel; + this.responseHandler = responseHandler; } public static DocumentOperationParameters parameters() { @@ -37,22 +41,22 @@ public class DocumentOperationParameters { /** Sets the priority with which to perform an operation. */ public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) { - return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel); + return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(FieldSet fieldSet) { - return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel); + return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler); } /** Sets the field set used for retrieval. */ public DocumentOperationParameters withFieldSet(String fieldSet) { - return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel); + return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler); } /** Sets the route along which to send the operation. */ public DocumentOperationParameters withRoute(String route) { - return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel); + return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler); } /** Sets the trace level for an operation. */ @@ -60,12 +64,44 @@ public class DocumentOperationParameters { if (traceLevel < 0 || traceLevel > 9) throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)"); - return new DocumentOperationParameters(priority, fieldSet, route, traceLevel); + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler); + } + + /** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */ + public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) { + return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler)); } public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); } public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); } public Optional<String> route() { return Optional.ofNullable(route); } public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); } + public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DocumentOperationParameters that = (DocumentOperationParameters) o; + return traceLevel == that.traceLevel && + priority == that.priority && + Objects.equals(fieldSet, that.fieldSet) && + Objects.equals(route, that.route); + } + + @Override + public int hashCode() { + return Objects.hash(priority, fieldSet, route, traceLevel); + } + + @Override + public String toString() { + return "DocumentOperationParameters{" + + "priority=" + priority + + ", fieldSet='" + fieldSet + '\'' + + ", route='" + route + '\'' + + ", traceLevel=" + traceLevel + + '}'; + } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 40f26a82a89..8781e4a3a51 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentIdResponse; +import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.DocumentUpdateResponse; import com.yahoo.documentapi.RemoveResponse; @@ -18,32 +19,39 @@ import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SyncParameters; import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.UpdateResponse; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; +import static com.yahoo.documentapi.Result.ResultType.SUCCESS; /** * @author bratseth + * @author jonmv */ public class LocalAsyncSession implements AsyncSession { - private final List<Response> responses = new LinkedList<>(); + private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); private final ResponseHandler handler; private final SyncSession syncSession; - private long requestId = 0; - private Random random = new Random(); + private final Executor executor = Executors.newCachedThreadPool(); + private final AtomicReference<Phaser> phaser; - private synchronized long getNextRequestId() { - requestId++; - return requestId; - } + private AtomicLong requestId = new AtomicLong(0); + private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); - random.setSeed(System.currentTimeMillis()); - syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.syncSession = access.createSyncSession(new SyncParameters.Builder().build()); + this.phaser = access.phaser; } @Override @@ -53,87 +61,85 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result put(Document document) { - return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3); + return put(new DocumentPut(document), parameters()); } @Override - public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - syncSession.put(documentPut, pri); - addResponse(new DocumentResponse(req, documentPut.getDocument())); - } catch (Exception e) { - addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { + return send(req -> { + try { + syncSession.put(documentPut, parameters); + return new DocumentResponse(req, documentPut.getDocument()); + } + catch (Exception e) { + return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); + } + }, + parameters); } @Override public Result get(DocumentId id) { - return get(id, false, DocumentProtocol.Priority.NORMAL_3); + return get(id, parameters()); } @Override - @Deprecated // TODO: Remove on Vespa 8 - public Result get(DocumentId id, boolean headersOnly, DocumentProtocol.Priority pri) { - return get(id, pri); - } - - @Override - public Result get(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - addResponse(new DocumentResponse(req, syncSession.get(id))); - } catch (Exception e) { - addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + public Result get(DocumentId id, DocumentOperationParameters parameters) { + return send(req -> { + try { + return new DocumentResponse(req, syncSession.get(id, parameters, null)); + } + catch (Exception e) { + return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); + } + }, + parameters); } @Override public Result remove(DocumentId id) { - return remove(id, DocumentProtocol.Priority.NORMAL_3); + return remove(id, parameters()); } @Override - public Result remove(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.remove(new DocumentRemove(id), pri)) { - addResponse(new RemoveResponse(req, true)); - } else { - addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + public Result remove(DocumentId id, DocumentOperationParameters parameters) { + return send(req -> { + if (syncSession.remove(new DocumentRemove(id), parameters)) { + return new RemoveResponse(req, true); + } + else { + return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); + } + }, + parameters); } @Override public Result update(DocumentUpdate update) { - return update(update, DocumentProtocol.Priority.NORMAL_3); + return update(update, parameters()); } @Override - public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.update(update, pri)) { - addResponse(new UpdateResponse(req, true)); - } else { - addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + public Result update(DocumentUpdate update, DocumentOperationParameters parameters) { + return send(req -> { + if (syncSession.update(update, parameters)) { + return new UpdateResponse(req, true); + } + else { + return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); + } + }, + parameters); } @Override public Response getNext() { - if (responses.isEmpty()) { - return null; - } - int index = random.nextInt(responses.size()); - return responses.remove(index); + return responses.poll(); } @Override - public Response getNext(int timeout) { - return getNext(); + public Response getNext(int timeoutMilliseconds) throws InterruptedException { + return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS); } @Override @@ -141,6 +147,11 @@ public class LocalAsyncSession implements AsyncSession { // empty } + /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */ + public void setResultType(Result.ResultType resultType) { + this.result.set(resultType); + } + private void addResponse(Response response) { if (handler != null) { handler.handleResponse(response); @@ -149,4 +160,29 @@ public class LocalAsyncSession implements AsyncSession { } } + private Result send(Function<Long, Response> responses, DocumentOperationParameters parameters) { + Result.ResultType resultType = result.get(); + if (resultType != SUCCESS) + return new Result(resultType, new Error()); + + ResponseHandler responseHandler = parameters.responseHandler().orElse(this::addResponse); + long req = requestId.incrementAndGet(); + Phaser synchronizer = phaser.get(); + if (synchronizer == null) + responseHandler.handleResponse(responses.apply(req)); + else { + synchronizer.register(); + executor.execute(() -> { + try { + synchronizer.arriveAndAwaitAdvance(); + responseHandler.handleResponse(responses.apply(req)); + } + finally { + synchronizer.awaitAdvance(synchronizer.arriveAndDeregister()); + } + }); + } + return new Result(req); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index c69a8fb48de..6a0f2d6afc2 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.SubscriptionParameters; import com.yahoo.documentapi.SubscriptionSession; import com.yahoo.documentapi.SyncParameters; -import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.VisitorDestinationParameters; import com.yahoo.documentapi.VisitorDestinationSession; import com.yahoo.documentapi.VisitorParameters; @@ -19,32 +18,38 @@ import com.yahoo.documentapi.VisitorSession; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicReference; /** - * The main class of the local implementation of the document api + * The main class of the local implementation of the document api. + * To easily obtain an instance of this, with the documents using the schemas (.sd-files) in a given directoy, + * use the {@code com.yahoo.vespa.application} test module and {@code DocumentAccesses.ofSchemas(schemaDirectory)} * * @author bratseth + * @author jonmv */ public class LocalDocumentAccess extends DocumentAccess { - Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final Map<DocumentId, Document> documents = new ConcurrentHashMap<>(); + final AtomicReference<Phaser> phaser = new AtomicReference<>(); public LocalDocumentAccess(DocumentAccessParams params) { super(params); } @Override - public SyncSession createSyncSession(SyncParameters parameters) { + public LocalSyncSession createSyncSession(SyncParameters parameters) { return new LocalSyncSession(this); } @Override - public AsyncSession createAsyncSession(AsyncParameters parameters) { + public LocalAsyncSession createAsyncSession(AsyncParameters parameters) { return new LocalAsyncSession(parameters, this); } @Override - public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { + public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { return new LocalVisitorSession(this, parameters); } @@ -63,4 +68,42 @@ public class LocalDocumentAccess extends DocumentAccess { throw new UnsupportedOperationException("Not supported yet"); } + /** + * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this. + * + * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default, + * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession}, + * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread. + * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a + * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the + * document operations — which are then also done by a dedicated thread pool, instead of the caller thread. + * + * When this is set, a party is registered with the phaser for the sender thread (visit) or for each document + * operation (async-session). The thread that sends a document (visit) or response (async-session) then arrives + * and awaits advance before sending each response, so the user can trigger these documents and responses. + * After the document or response is delivered, the thread arrives and awaits advance, so the user + * can wait until the document or response has been delivered. This also ensures memory visibility. + * The visit sender thread deregisters when the whole visit is done; the async session threads after each operation. + * Example usage: + * + * <pre> {@code + * void testOperations(LocalDocumentAccess access) { + * List<Response> responses = new ArrayList<>(); + * Phaser phaser = new Phaser(1); // "1" to register self + * access.setPhaser(phaser); + * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add)); + * session.put(documentPut); + * session.get(documentId); + * // Operations wait for this thread to arrive at "phaser" + * phaser.arriveAndAwaitAdvance(); // Let operations send their responses + * // "responses" may or may not hold the responses now + * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc. + * // "responses" now has responses from all previous operations + * phaser.arriveAndDeregister(); // Deregister so further operations flow freely + * }}</pre> + */ + public void setPhaser(Phaser phaser) { + this.phaser.set(phaser); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index f087b646ca4..e0ae0278de8 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions; import java.util.Comparator; import java.util.Map; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; /** @@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession { private final DocumentSelector selector; private final FieldSet fieldSet; private final AtomicReference<State> state; + private final AtomicReference<Phaser> phaser; public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException { if (parameters.getResumeToken() != null) @@ -64,11 +66,16 @@ public class LocalVisitorSession implements VisitorSession { this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString)); this.outstanding.putAll(access.documents); this.state = new AtomicReference<>(State.RUNNING); + this.phaser = access.phaser; start(); } void start() { + Phaser synchronizer = phaser.get(); + if (synchronizer != null) + synchronizer.register(); + new Thread(() -> { try { // Iterate through all documents and pass on to data handler @@ -76,14 +83,26 @@ public class LocalVisitorSession implements VisitorSession { if (state.get() != State.RUNNING) return; - if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + try { + if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + return; + } + catch (RuntimeException e) { return; + } Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); + + if (synchronizer != null) + synchronizer.arriveAndAwaitAdvance(); + data.onMessage(new PutDocumentMessage(new DocumentPut(copy)), new AckToken(id)); + + if (synchronizer != null) + synchronizer.arriveAndAwaitAdvance(); }); // Transition to a terminal state when done state.updateAndGet(current -> { @@ -107,6 +126,9 @@ public class LocalVisitorSession implements VisitorSession { control.onDone(VisitorControlHandler.CompletionCode.FAILURE, Exceptions.toMessageString(e)); } finally { + if (synchronizer != null) + synchronizer.arriveAndDeregister(); + data.onDone(); } }).start(); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java index 7a71089c180..0a4ab66aea4 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java @@ -184,7 +184,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { Result send(Message msg, DocumentOperationParameters parameters) { try { long reqId = requestId.incrementAndGet(); - msg.setContext(reqId); + msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null))); msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel)); // Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew! String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route); @@ -198,6 +198,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { } } + private static class OperationContext { + private final long reqId; + private final ResponseHandler responseHandler; + private OperationContext(long reqId, ResponseHandler responseHandler) { + this.reqId = reqId; + this.responseHandler = responseHandler; + } + } + /** * A convenience method for assigning the internal trace level and route string to a message before sending it * through the internal mbus session object. @@ -206,7 +215,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { * @return the document api result object. */ public Result send(Message msg) { - return send(msg, null); + return send(msg, parameters()); } @Override @@ -285,11 +294,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { new Error(mbusResult.getError().getMessage() + " (" + mbusResult.getError().getCode() + ")")); } - private static Response toResponse(Reply reply) { - long reqId = (Long) reply.getContext(); - return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId); - } - private static Response toError(Reply reply, long reqId) { boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound() || reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound(); @@ -346,8 +350,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession { if (reply.getTrace().getLevel() > 0) { log.log(Level.INFO, reply.getTrace().toString()); } - Response response = toResponse(reply); - if (handler != null) { + OperationContext context = (OperationContext) reply.getContext(); + long reqId = context.reqId; + Response response = reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId); + ResponseHandler operationSpecificResponseHandler = context.responseHandler; + if (operationSpecificResponseHandler != null) + operationSpecificResponseHandler.handleResponse(response); + else if (handler != null) { handler.handleResponse(response); } else { queue.add(response); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 69dc7c6da74..b4e17038a35 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -36,6 +36,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -85,17 +91,22 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { } @Test - public void testAsyncFetch() { - AsyncSession session = access.createAsyncSession(new AsyncParameters()); + public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException { + LocalAsyncSession session = access.createAsyncSession(new AsyncParameters()); List<DocumentId> ids = new ArrayList<>(); ids.add(new DocumentId("id:music:music::1")); ids.add(new DocumentId("id:music:music::2")); ids.add(new DocumentId("id:music:music::3")); for (DocumentId id : ids) session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - int timeout = 100; + + // Let all async operations wait for a signal from the test thread before sending their responses, and let test + // thread wait for all responses to be delivered afterwards. + Phaser phaser = new Phaser(1); + access.setPhaser(phaser); long startTime = System.currentTimeMillis(); + int timeoutMillis = 1000; Set<Long> outstandingRequests = new HashSet<>(); for (DocumentId id : ids) { Result result = session.get(id); @@ -104,27 +115,38 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { outstandingRequests.add(result.getRequestId()); } - List<Document> documents = new ArrayList<>(); - try { - while ( ! outstandingRequests.isEmpty()) { - int timeSinceStart = (int)(System.currentTimeMillis() - startTime); - Response response = session.getNext(timeout - timeSinceStart); - if (response == null) - throw new RuntimeException("Timed out waiting for documents"); // or return what you have - if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore - - if (response.isSuccess()) - documents.add(((DocumentResponse)response).getDocument()); - outstandingRequests.remove(response.getRequestId()); + // Wait for responses in separate thread. + Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> { + try { + List<Document> documents = new ArrayList<>(); + while ( ! outstandingRequests.isEmpty()) { + int timeSinceStart = (int) (System.currentTimeMillis() - startTime); + Response response = session.getNext(timeoutMillis - timeSinceStart); + if (response == null) + throw new RuntimeException("Timed out waiting for documents"); // or return what you have + if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore + + if (response.isSuccess()) + documents.add(((DocumentResponse) response).getDocument()); + outstandingRequests.remove(response.getRequestId()); + } + assertEquals(3, documents.size()); + for (Document document : documents) + assertNotNull(document); } - } - catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for documents", e); - } + catch (InterruptedException e) { + throw new IllegalArgumentException("Interrupted while waiting for responses"); + } + }); + + // All operations, and receiver, now waiting for this thread to arrive. + assertEquals(4, phaser.getRegisteredParties()); + assertEquals(0, phaser.getPhase()); + phaser.arrive(); + assertEquals(1, phaser.getPhase()); + phaser.awaitAdvance(phaser.arriveAndDeregister()); - assertEquals(3, documents.size()); - for (Document document : documents) - assertNotNull(document); + futureWithAssertions.get(1000, TimeUnit.MILLISECONDS); } @Test diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java index 4a87217e08f..eebaf1f579f 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java @@ -7,19 +7,33 @@ import com.google.inject.binder.AnnotatedBindingBuilder; import com.yahoo.jdisc.Container; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; -import com.yahoo.jdisc.application.*; +import com.yahoo.jdisc.application.Application; +import com.yahoo.jdisc.application.ContainerActivator; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.jdisc.application.DeactivatedContainer; +import com.yahoo.jdisc.application.OsgiFramework; import com.yahoo.jdisc.core.ApplicationLoader; import com.yahoo.jdisc.core.BootstrapLoader; import com.yahoo.jdisc.core.FelixFramework; import com.yahoo.jdisc.core.FelixParams; -import com.yahoo.jdisc.handler.*; +import com.yahoo.jdisc.handler.BindingNotFoundException; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDeniedException; +import com.yahoo.jdisc.handler.RequestDispatch; +import com.yahoo.jdisc.handler.RequestHandler; +import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.service.CurrentContainer; import java.net.URI; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -27,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; * {@link BootstrapLoader} that provides convenient access to the {@link ContainerActivator} and {@link * CurrentContainer} interfaces. A typical test case using this class looks as follows:</p> * <pre> - * {@literal @}Test + *{@literal @}Test * public void requireThatMyComponentIsWellBehaved() { * TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi(); * ContainerBuilder builder = driver.newContainerBuilder(); diff --git a/testutil/src/main/java/com/yahoo/test/ManualClock.java b/testutil/src/main/java/com/yahoo/test/ManualClock.java index ffef6895c38..ba7d9698d72 100644 --- a/testutil/src/main/java/com/yahoo/test/ManualClock.java +++ b/testutil/src/main/java/com/yahoo/test/ManualClock.java @@ -9,6 +9,7 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAmount; +import java.util.concurrent.atomic.AtomicReference; /** * A clock which initially has the time of its creation but can only be advanced by calling advance @@ -17,7 +18,7 @@ import java.time.temporal.TemporalAmount; */ public class ManualClock extends Clock { - private Instant currentTime = Instant.now(); + private AtomicReference<Instant> currentTime = new AtomicReference<>(Instant.now()); @Inject public ManualClock() {} @@ -27,19 +28,19 @@ public class ManualClock extends Clock { } public ManualClock(Instant currentTime) { - this.currentTime = currentTime; + setInstant(currentTime); } public void advance(TemporalAmount temporal) { - currentTime = currentTime.plus(temporal); + currentTime.updateAndGet(time -> time.plus(temporal)); } public void setInstant(Instant time) { - currentTime = time; + currentTime.set(time); } @Override - public Instant instant() { return currentTime; } + public Instant instant() { return currentTime.get(); } @Override public ZoneId getZone() { return null; } @@ -48,7 +49,7 @@ public class ManualClock extends Clock { public Clock withZone(ZoneId zone) { return null; } @Override - public long millis() { return currentTime.toEpochMilli(); } + public long millis() { return instant().toEpochMilli(); } public static Instant at(String utcIsoTime) { return LocalDateTime.parse(utcIsoTime, DateTimeFormatter.ISO_DATE_TIME).atZone(ZoneOffset.UTC).toInstant(); diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java index 57b28cecdca..f69bdc2a91d 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java @@ -240,7 +240,7 @@ class ApacheGatewayConnection implements GatewayConnection { httpPost.addHeader(headerName, headerValue); }); - if (useCompression) { + if (useCompression) { // This causes the apache client to gzip the request content. Weird, huh? httpPost.setHeader("Content-Encoding", "gzip"); } return httpPost; diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml index 9c4b81da806..8254c208588 100644 --- a/vespaclient-container-plugin/pom.xml +++ b/vespaclient-container-plugin/pom.xml @@ -72,6 +72,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>testutil</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java new file mode 100644 index 00000000000..8b0c966c46f --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java @@ -0,0 +1,310 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.document.fieldset.AllFields; +import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.text.Text; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** + * Wraps the document API with an executor that can retry and time out document operations, + * as well as compute the required visitor parameters for visitor sessions. + * + * @author jonmv + */ +public interface DocumentOperationExecutor { + + default void shutdown() { } + + void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context); + + void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context); + + void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context); + + void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context); + + void visit(VisitorOptions options, VisitOperationsContext context); + + String routeToCluster(String cluster); + + enum ErrorType { + OVERLOAD, + NOT_FOUND, + PRECONDITION_FAILED, + BAD_REQUEST, + TIMEOUT, + ERROR; + } + + + /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */ + class Context<T> { + + private final AtomicBoolean handled = new AtomicBoolean(); + private final BiConsumer<ErrorType, String> onError; + private final Consumer<T> onSuccess; + + Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) { + this.onError = onError; + this.onSuccess = onSuccess; + } + + public void error(ErrorType type, String message) { + if ( ! handled.getAndSet(true)) + onError.accept(type, message); + } + + public void success(T result) { + if ( ! handled.getAndSet(true)) + onSuccess.accept(result); + } + + public boolean handled() { + return handled.get(); + } + + } + + + /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */ + class VisitOperationsContext extends Context<Optional<String>> { + + private final Consumer<Document> onDocument; + + public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) { + super(onError, onSuccess); + this.onDocument = onDocument; + } + + public void document(Document document) { + if ( ! handled()) + onDocument.accept(document); + } + + } + + + /** Context for a document operation. */ + class OperationContext extends Context<Optional<Document>> { + + public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) { + super(onError, onSuccess); + } + + } + + + class VisitorOptions { + + final Optional<String> cluster; + final Optional<String> namespace; + final Optional<String> documentType; + final Optional<Group> group; + final Optional<String> selection; + final Optional<String> fieldSet; + final Optional<String> continuation; + final Optional<String> bucketSpace; + final Optional<Integer> wantedDocumentCount; + final Optional<Integer> concurrency; + + private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace, + Optional<Group> group, Optional<String> selection, Optional<String> fieldSet, + Optional<String> continuation, Optional<String> bucketSpace, + Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) { + this.cluster = cluster; + this.namespace = namespace; + this.documentType = documentType; + this.group = group; + this.selection = selection; + this.fieldSet = fieldSet; + this.continuation = continuation; + this.bucketSpace = bucketSpace; + this.wantedDocumentCount = wantedDocumentCount; + this.concurrency = concurrency; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VisitorOptions that = (VisitorOptions) o; + return cluster.equals(that.cluster) && + namespace.equals(that.namespace) && + documentType.equals(that.documentType) && + group.equals(that.group) && + selection.equals(that.selection) && + fieldSet.equals(that.fieldSet) && + continuation.equals(that.continuation) && + bucketSpace.equals(that.bucketSpace) && + wantedDocumentCount.equals(that.wantedDocumentCount) && + concurrency.equals(that.concurrency); + } + + @Override + public int hashCode() { + return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency); + } + + @Override + public String toString() { + return "VisitorOptions{" + + "cluster=" + cluster + + ", namespace=" + namespace + + ", documentType=" + documentType + + ", group=" + group + + ", selection=" + selection + + ", fieldSet=" + fieldSet + + ", continuation=" + continuation + + ", bucketSpace=" + bucketSpace + + ", wantedDocumentCount=" + wantedDocumentCount + + ", concurrency=" + concurrency + + '}'; + } + + public static Builder builder() { return new Builder(); } + + + public static class Builder { + + private String cluster; + private String documentType; + private String namespace; + private Group group; + private String selection; + private String fieldSet; + private String continuation; + private String bucketSpace; + private Integer wantedDocumentCount; + private Integer concurrency; + + public Builder cluster(String cluster) { + this.cluster = cluster; + return this; + } + + public Builder documentType(String documentType) { + this.documentType = documentType; + return this; + } + + public Builder namespace(String namespace) { + this.namespace = namespace; + return this; + } + + public Builder group(Group group) { + this.group = group; + return this; + } + + public Builder selection(String selection) { + this.selection = selection; + return this; + } + + public Builder fieldSet(String fieldSet) { + this.fieldSet = fieldSet; + return this; + } + + public Builder continuation(String continuation) { + this.continuation = continuation; + return this; + } + + public Builder bucketSpace(String bucketSpace) { + this.bucketSpace = bucketSpace; + return this; + } + + public Builder wantedDocumentCount(Integer wantedDocumentCount) { + this.wantedDocumentCount = wantedDocumentCount; + return this; + } + + public Builder concurrency(Integer concurrency) { + this.concurrency = concurrency; + return this; + } + + public VisitorOptions build() { + return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType), + Optional.ofNullable(namespace), Optional.ofNullable(group), + Optional.ofNullable(selection), Optional.ofNullable(fieldSet), + Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace), + Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency)); + } + + } + + } + + + class Group { + + private final String value; + private final String docIdPart; + private final String selection; + + private Group(String value, String docIdPart, String selection) { + Text.validateTextString(value) + .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); }); + this.value = value; + this.docIdPart = docIdPart; + this.selection = selection; + } + + public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); } + public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); } + + public String value() { return value; } + public String docIdPart() { return docIdPart; } + public String selection() { return selection; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Group group = (Group) o; + return value.equals(group.value) && + docIdPart.equals(group.docIdPart) && + selection.equals(group.selection); + } + + @Override + public int hashCode() { + return Objects.hash(value, docIdPart, selection); + } + + @Override + public String toString() { + return "Group{" + + "value='" + value + '\'' + + ", docIdPart='" + docIdPart + '\'' + + ", selection='" + selection + '\'' + + '}'; + } + + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java new file mode 100644 index 00000000000..135b6a824c8 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java @@ -0,0 +1,496 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.DocumentResponse; +import com.yahoo.documentapi.DumpVisitorDataHandler; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.ResponseHandler; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.yolean.Exceptions; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Encapsulates a document access and supports running asynchronous document + * operations and visits against this, with retries and optional timeouts. + * + * @author jonmv + */ +public class DocumentOperationExecutorImpl implements DocumentOperationExecutor { + + private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName()); + + private final Duration visitTimeout; + private final long maxThrottled; + private final DocumentAccess access; + private final AsyncSession asyncSession; + private final Map<String, StorageCluster> clusters; + private final Clock clock; + private final DelayQueue throttled; + private final DelayQueue timeouts; + private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); + + public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, + DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) { + this(Duration.ofMillis(executorConfig.resendDelayMillis()), + Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()), + Duration.ofSeconds(executorConfig.visitTimeoutSeconds()), + executorConfig.maxThrottled(), + access, + parseClusters(clustersConfig, bucketsConfig), + clock); + } + + DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled, + DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) { + this.visitTimeout = requireNonNull(visitTimeout); + this.maxThrottled = maxThrottled; + this.access = requireNonNull(access); + this.asyncSession = access.createAsyncSession(new AsyncParameters()); + this.clock = requireNonNull(clock); + this.clusters = Map.copyOf(clusters); + this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock, "throttle"); + this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> { + context.error(TIMEOUT, "Timed out after " + defaultTimeout); + return true; + }, defaultTimeout, clock, "timeout"); + } + + private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { + if (options.cluster.isEmpty() && options.documentType.isEmpty()) + throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + + VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, + options.documentType, + options.namespace.map(value -> "id.namespace=='" + value + "'"), + options.group.map(Group::selection)) + .flatMap(Optional::stream) + .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right + StringJoiner::add, + StringJoiner::merge) + .toString()); + + options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); + parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); + options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); + parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10)); + options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1))); + parameters.setTimeoutMs(visitTimeout.toMillis()); + parameters.visitInconsistentBuckets(true); + parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); + + StorageCluster storageCluster = resolveCluster(options.cluster, clusters); + parameters.setRoute(storageCluster.route()); + parameters.setBucketSpace(resolveBucket(storageCluster, + options.documentType, + List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), + options.bucketSpace)); + + return parameters; + } + + /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */ + @Override + public void shutdown() { + long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli(); + visits.values().forEach(VisitorSession::destroy); + Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30), + context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown")); + Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40), + context -> context.error(TIMEOUT, "Timed out due to shutdown")); + try { + throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); + timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throttleShutdown.cancel(true); + throttleShutdown.cancel(true); + log.log(WARNING, "Exception shutting down " + getClass().getName(), e); + } + } + + @Override + public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(parameters, context))), context); + } + + @Override + public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(parameters, context))), context); + } + + @Override + public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(parameters, context))), context); + } + + @Override + public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(parameters, context))), context); + } + + @Override + public void visit(VisitorOptions options, VisitOperationsContext context) { + try { + AtomicBoolean done = new AtomicBoolean(false); + VisitorParameters parameters = asParameters(options, clusters, visitTimeout); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); } + @Override public void onRemove(DocumentId id) { } // We don't visit removes here. + }); + parameters.setControlHandler(new VisitorControlHandler() { + @Override public void onDone(CompletionCode code, String message) { + super.onDone(code, message); + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets()) + context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout); + case SUCCESS: // intentional fallthrough + case ABORTED: + context.success(Optional.ofNullable(getProgress()) + .filter(progress -> ! progress.isFinished()) + .map(ProgressToken::serializeToString)); + break; + default: + context.error(ERROR, message != null ? message : "Visiting failed"); + } + done.set(true); // This may be reached before dispatching thread is done putting us in the map. + visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; }); + } + }); + visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + if (done.get()) + visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; }); + } + catch (IllegalArgumentException | ParseException e) { + context.error(BAD_REQUEST, Exceptions.toMessageString(e)); + } + catch (RuntimeException e) { + context.error(ERROR, Exceptions.toMessageString(e)); + } + } + + @Override + public String routeToCluster(String cluster) { + return resolveCluster(Optional.of(cluster), clusters).route(); + } + + private ResponseHandler handlerOf(DocumentOperationParameters parameters, OperationContext context) { + return response -> { + parameters.responseHandler().ifPresent(originalHandler -> originalHandler.handleResponse(response)); + if (response.isSuccess()) + context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) + : Optional.empty()); + else + context.error(toErrorType(response.outcome()), response.getTextMessage()); + }; + } + + /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ + private void accept(Supplier<Result> operation, OperationContext context) { + timeouts.add(operation, context); + if (throttled.size() > 0 || ! send(operation, context)) + if ( ! throttled.add(operation, context)) + context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); + } + + /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */ + private boolean send(Supplier<Result> operation, OperationContext context) { + Result result = operation.get(); + switch (result.type()) { + case SUCCESS: + return true; + case TRANSIENT_ERROR: + return false; + default: + log.log(WARNING, "Unknown result type '" + result.type() + "'"); + case FATAL_ERROR: // intentional fallthrough + context.error(ERROR, result.getError().getMessage()); + return true; // Request handled, don't retry. + } + } + + private static ErrorType toErrorType(Response.Outcome outcome) { + switch (outcome) { + case NOT_FOUND: + return NOT_FOUND; + case CONDITION_FAILED: + return PRECONDITION_FAILED; + default: + log.log(WARNING, "Unexpected response outcome: " + outcome); + case ERROR: // intentional fallthrough + return ERROR; + } + } + + + /** + * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. + * + * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility + * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions. + */ + static class DelayQueue { + + private final long maxSize; + private final Clock clock; + private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>(); + private final AtomicLong size = new AtomicLong(0); + private final Thread maintainer; + private final Duration delay; + private final long defaultWaitMillis; + + public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action, + Duration delay, Clock clock, String threadName) { + if (maxSize < 0) + throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize); + if (delay.isNegative()) + throw new IllegalArgumentException("Delay cannot be negative, but was " + delay); + + this.maxSize = maxSize; + this.delay = delay; + this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts. + this.clock = requireNonNull(clock); + this.maintainer = new DaemonThreadFactory("document-operation-executor-" + threadName).newThread(() -> maintain(action)); + this.maintainer.start(); + } + + boolean add(Supplier<Result> operation, OperationContext context) { + if (size.incrementAndGet() > maxSize) { + size.decrementAndGet(); + return false; + } + return queue.add(new Delayed(clock.instant().plus(delay), operation, context)); + } + + long size() { return size.get(); } + + Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) { + ExecutorService shutdownService = Executors.newSingleThreadExecutor(); + Future<?> future = shutdownService.submit(() -> { + try { + long doomMillis = clock.millis() + grace.toMillis(); + while (size.get() > 0 && clock.millis() < doomMillis) + Thread.sleep(100); + } + finally { + maintainer.interrupt(); + for (Delayed delayed; (delayed = queue.poll()) != null; ) { + size.decrementAndGet(); + onShutdown.accept(delayed.context()); + } + } + return null; + }); + shutdownService.shutdown(); + return future; + } + + /** + * Repeatedly loops through the queue, evicting already handled entries and processing those + * which have become ready since last time, then waits until new items are guaranteed to be ready, + * or until it's time for a new run just to ensure GC of handled entries. + * The entries are assumed to always be added to the back of the queue, with the same delay. + * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time + * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first. + */ + private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) { + while ( ! Thread.currentThread().isInterrupted()) { + try { + Instant waitUntil = null; + Iterator<Delayed> operations = queue.iterator(); + boolean rejected = false; + while (operations.hasNext()) { + Delayed delayed = operations.next(); + // Already handled: remove and continue. + if (delayed.context().handled()) { + operations.remove(); + size.decrementAndGet(); + continue; + } + // Ready for action: remove from queue and run unless an operation was already rejected. + if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) { + if (action.test(delayed.operation(), delayed.context())) { + operations.remove(); + size.decrementAndGet(); + continue; + } + else { // If an operation is rejected, handle no more this run, and wait a short while before retrying. + waitUntil = clock.instant().plus(Duration.ofMillis(10)); + rejected = true; + } + } + // Not yet ready for action: keep time to wake up again. + waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); + } + long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis; + synchronized (this) { + do { + notify(); + wait(Math.max(0, waitUntilMillis - clock.millis())); + } + while (clock.millis() < waitUntilMillis); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (Exception e) { + log.log(SEVERE, "Exception caught by delay queue maintainer", e); + } + } + } + } + + + private static class Delayed { + + private final Supplier<Result> operation; + private final OperationContext context; + private final Instant readyAt; + + Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) { + this.readyAt = requireNonNull(readyAt); + this.context = requireNonNull(context); + this.operation = requireNonNull(operation); + } + + Supplier<Result> operation() { return operation; } + OperationContext context() { return context; } + Instant readyAt() { return readyAt; } + + } + + + static class StorageCluster { + + private final String name; + private final String configId; + private final Map<String, String> documentBuckets; + + StorageCluster(String name, String configId, Map<String, String> documentBuckets) { + this.name = requireNonNull(name); + this.configId = requireNonNull(configId); + this.documentBuckets = Map.copyOf(documentBuckets); + } + + String name() { return name; } + String configId() { return configId; } + String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } + Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } + + } + + + static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { + if (clusters.isEmpty()) + throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); + + return wanted.map(cluster -> { + if ( ! clusters.containsKey(cluster)) + throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.get(cluster); + }).orElseGet(() -> { + if (clusters.size() > 1) + throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.values().iterator().next(); + }); + } + + private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, + List<String> bucketSpaces, Optional<String> bucketSpace) { + return documentType.map(type -> cluster.bucketOf(type) + .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + + "' is not mapped to a known bucket space"))) + .or(() -> bucketSpace.map(space -> { + if ( ! bucketSpaces.contains(space)) + throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + + String.join(", ", bucketSpaces)); + return space; + })) + .orElse(FixedBucketSpaces.defaultSpace()); + } + + + + private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) { + return clusters.storage().stream() + .collect(toUnmodifiableMap(storage -> storage.name(), + storage -> new StorageCluster(storage.name(), + storage.configid(), + buckets.cluster(storage.name()) + .documentType().entrySet().stream() + .collect(toMap(entry -> entry.getKey(), + entry -> entry.getValue().bucketSpace()))))); + } + + + // Visible for testing. + AsyncSession asyncSession() { return asyncSession; } + Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); } + void notifyMaintainers() throws InterruptedException { + synchronized (throttled) { throttled.notify(); throttled.wait(); } + synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java new file mode 100644 index 00000000000..809e3522652 --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -0,0 +1,620 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi.resource; + +import com.fasterxml.jackson.core.JsonFactory; +import com.google.inject.Inject; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.container.core.documentapi.VespaDocumentAccess; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.document.json.JsonReader; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.json.document.DocumentParser; +import com.yahoo.document.restapi.DocumentOperationExecutor; +import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType; +import com.yahoo.document.restapi.DocumentOperationExecutor.Group; +import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; +import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; +import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; +import com.yahoo.document.restapi.DocumentOperationExecutorConfig; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl; +import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.metrics.DocumentApiMetrics; +import com.yahoo.documentapi.metrics.DocumentOperationStatus; +import com.yahoo.documentapi.metrics.DocumentOperationType; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.CompletionHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ReadableContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.jdisc.handler.UnsafeContentInputStream; +import com.yahoo.container.core.HandlerMetricContextUtil; +import com.yahoo.jdisc.http.HttpRequest; +import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.metrics.simple.MetricReceiver; +import com.yahoo.restapi.Path; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.yolean.Exceptions; + +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.time.Clock; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.logging.Logger; + +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; +import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; +import static com.yahoo.jdisc.http.HttpRequest.Method.GET; +import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS; +import static com.yahoo.jdisc.http.HttpRequest.Method.POST; +import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.joining; + +/** + * Asynchronous HTTP handler for /document/v1/ + * + * @author jonmv + */ +public class DocumentV1ApiHandler extends AbstractRequestHandler { + + private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName()); + private static final Parser<Integer> numberParser = Integer::parseInt; + private static final Parser<Boolean> booleanParser = Boolean::parseBoolean; + + private static final CompletionHandler logException = new CompletionHandler() { + @Override public void completed() { } + @Override public void failed(Throwable t) { + log.log(FINE, () -> "Exception writing or closing response data: " + Exceptions.toMessageString(t)); + } + }; + + private static final ContentChannel ignoredContent = new ContentChannel() { + @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); } + @Override public void close(CompletionHandler handler) { handler.completed(); } + }; + + private static final String CREATE = "create"; + private static final String CONDITION = "condition"; + private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get + private static final String FIELD_SET = "fieldSet"; + private static final String SELECTION = "selection"; + private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get + private static final String CONTINUATION = "continuation"; + private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount"; + private static final String CONCURRENCY = "concurrency"; + private static final String BUCKET_SPACE = "bucketSpace"; + + private final Clock clock; + private final Metric metric; // TODO jonmv: make response class which logs on completion/error + private final DocumentApiMetrics metrics; + private final DocumentOperationExecutor executor; + private final DocumentOperationParser parser; + private final Map<String, Map<Method, Handler>> handlers; + + @Inject + public DocumentV1ApiHandler(Clock clock, + Metric metric, + MetricReceiver metricReceiver, + VespaDocumentAccess documentAccess, + DocumentmanagerConfig documentManagerConfig, + ClusterListConfig clusterListConfig, + AllClustersBucketSpacesConfig bucketSpacesConfig, + DocumentOperationExecutorConfig executorConfig) { + this(clock, + new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock), + new DocumentOperationParser(documentManagerConfig), + metric, + metricReceiver); + } + + DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser, + Metric metric, MetricReceiver metricReceiver) { + this.clock = clock; + this.executor = executor; + this.parser = parser; + this.metric = metric; + this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); + this.handlers = defineApi(); + } + + @Override + public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) { + HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass()); + ResponseHandler responseHandler = response -> { + HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass()); + return rawResponseHandler.handleResponse(response); + }; + + HttpRequest request = (HttpRequest) rawRequest; + try { + Path requestPath = new Path(request.getUri()); + for (String path : handlers.keySet()) + if (requestPath.matches(path)) { + Map<Method, Handler> methods = handlers.get(path); + if (methods.containsKey(request.getMethod())) + return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler); + + if (request.getMethod() == OPTIONS) + return options(methods.keySet(), responseHandler); + + return methodNotAllowed(request, methods.keySet(), responseHandler); + } + return notFound(request, handlers.keySet(), responseHandler); + } + catch (IllegalArgumentException e) { + return badRequest(request, e, responseHandler); + } + catch (RuntimeException e) { + return serverError(request, e, responseHandler); + } + } + + @Override + public void destroy() { + this.executor.shutdown(); + } + + private Map<String, Map<Method, Handler>> defineApi() { + Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); + + handlers.put("/document/v1/", + Map.of(GET, this::getRoot)); + + handlers.put("/document/v1/{namespace}/{documentType}/docid/", + Map.of(GET, this::getDocumentType)); + + handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/", + Map.of(GET, this::getDocumentType)); + + handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/", + Map.of(GET, this::getDocumentType)); + + handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}", + Map.of(GET, this::getDocument, + POST, this::postDocument, + PUT, this::putDocument, + DELETE, this::deleteDocument)); + + handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}", + Map.of(GET, this::getDocument, + POST, this::postDocument, + PUT, this::putDocument, + DELETE, this::deleteDocument)); + + handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}", + Map.of(GET, this::getDocument, + POST, this::postDocument, + PUT, this::putDocument, + DELETE, this::deleteDocument)); + + return Collections.unmodifiableMap(handlers); + } + + private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) { + Cursor root = responseRoot(request); + executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler)); + return ignoredContent; + } + + private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) { + Cursor root = responseRoot(request); + VisitorOptions.Builder options = parseOptions(request, path); + options = options.documentType(path.documentType()); + options = options.namespace(path.namespace()); + options = path.group().map(options::group).orElse(options); + executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler)); + return ignoredContent; + } + + private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) { + Object monitor = new Object(); + return new VisitOperationsContext((type, message) -> { + synchronized (monitor) { + handleError(request, type, message, root, handler); + } + }, + token -> { + token.ifPresent(value -> root.setString("continuation", value)); + synchronized (monitor) { + respond(root, handler); + } + }, + // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway + document -> { + try { + synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization. + SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(), + documents.addObject()); + } + } + // TODO jonmv: This shouldn't happen much, but ... expose errors too? + catch (RuntimeException e) { + log.log(WARNING, "Exception serializing document in document/v1 visit response", e); + } + }); + } + + private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) { + DocumentId id = path.id(); + DocumentOperationParameters parameters = parameters(); + parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters); + parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters); + executor.get(id, + parameters, + new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), + document -> { + try { + Cursor root = responseRoot(request, id); + document.map(JsonWriter::toByteArray) + .map(SlimeUtils::jsonToSlime) + .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields"))); + respond(document.isPresent() ? 200 : 404, + root, + handler); + } + catch (Exception e) { + serverError(request, new RuntimeException(e), handler); + } + })); + return ignoredContent; + } + + private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { + DocumentId id = path.id(); + ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant()); + return new ForwardingContentChannel(in -> { + try { + DocumentPut put = parser.parsePut(in, id.toString()); + getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); + executor.put(put, + getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), + new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), + __ -> respond(responseRoot(request, id), handler))); + } + catch (IllegalArgumentException e) { + badRequest(request, e, handler); + } + catch (RuntimeException e) { + serverError(request, e, handler); + } + }); + } + + private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { + DocumentId id = path.id(); + ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant()); + return new ForwardingContentChannel(in -> { + try { + DocumentUpdate update = parser.parseUpdate(in, id.toString()); + getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); + getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent); + executor.update(update, + getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), + new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), + __ -> respond(responseRoot(request, id), handler))); + } + catch (IllegalArgumentException e) { + badRequest(request, e, handler); + } + catch (RuntimeException e) { + serverError(request, e, handler); + } + }); + } + + private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { + DocumentId id = path.id(); + ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant()); + executor.remove(id, + getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), + new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), + __ -> respond(responseRoot(request, id), handler))); + return ignoredContent; + } + + private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) { + switch (type) { + case BAD_REQUEST: + badRequest(request, message, root, handler); + break; + case NOT_FOUND: + notFound(request, message, root, handler); + break; + case PRECONDITION_FAILED: + preconditionFailed(request, message, root, handler); + break; + case OVERLOAD: + overload(request, message, root, handler); + break; + case TIMEOUT: + timeout(request, message, root, handler); + break; + default: + log.log(WARNING, "Unexpected error type '" + type + "'"); + case ERROR: // intentional fallthrough + serverError(request, message, root, handler); + } + } + + // ------------------------------------------------ Responses ------------------------------------------------ + + private static Cursor responseRoot(HttpRequest request) { + Cursor root = new Slime().setObject(); + root.setString("pathId", request.getUri().getRawPath()); + return root; + } + + private static Cursor responseRoot(HttpRequest request, DocumentId id) { + Cursor root = responseRoot(request); + root.setString("id", id.toString()); + return root; + } + + private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) { + Response response = new Response(Response.Status.NO_CONTENT); + response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(","))); + handler.handleResponse(response).close(logException); + return ignoredContent; + } + + private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) { + return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler); + } + + private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message); + root.setString("message", message); + return respond(Response.Status.BAD_REQUEST, root, handler); + } + + private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) { + return notFound(request, + "Nothing at '" + request.getUri().getRawPath() + "'. " + + "Available paths are:\n" + String.join("\n", paths), + responseRoot(request), + handler); + } + + private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + root.setString("message", message); + return respond(Response.Status.NOT_FOUND, root, handler); + } + + private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) { + Cursor root = responseRoot(request); + root.setString("message", + "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " + + "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", "))); + return respond(Response.Status.METHOD_NOT_ALLOWED, + root, + handler); + } + + private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + root.setString("message", message); + return respond(Response.Status.PRECONDITION_FAILED, root, handler); + } + + private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); + root.setString("message", message); + return respond(Response.Status.TOO_MANY_REQUESTS, root, handler); + } + + private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) { + log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e); + Cursor root = responseRoot(request); + root.setString("message", Exceptions.toMessageString(e)); + return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler); + } + + private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); + root.setString("message", message); + return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler); + } + + private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) { + log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); + root.setString("message", message); + return respond(Response.Status.GATEWAY_TIMEOUT, root, handler); + } + + private static ContentChannel respond(Inspector root, ResponseHandler handler) { + return respond(200, root, handler); + } + + private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) { + Response response = new Response(status); + response.headers().put("Content-Type", "application/json; charset=UTF-8"); + ContentChannel out = null; + try { + out = handler.handleResponse(response); + out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException); + } + catch (Exception e) { + log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e)); + } + finally { + if (out != null) try { + out.close(logException); + } + catch (Exception e) { + log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e)); + } + } + return ignoredContent; + } + + // ------------------------------------------------ Helpers ------------------------------------------------ + + private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) { + VisitorOptions.Builder options = VisitorOptions.builder(); + + getProperty(request, SELECTION).ifPresent(options::selection); + getProperty(request, CONTINUATION).ifPresent(options::continuation); + getProperty(request, FIELD_SET).ifPresent(options::fieldSet); + getProperty(request, CLUSTER).ifPresent(options::cluster); + getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace); + getProperty(request, WANTED_DOCUMENT_COUNT, numberParser) + .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count))); + getProperty(request, CONCURRENCY, numberParser) + .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency))); + + return options; + } + + static class DocumentPath { + + private final Path path; + private final Optional<Group> group; + + DocumentPath(Path path) { + this.path = requireNonNull(path); + this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of) + .or(() -> Optional.ofNullable(path.get("group")).map(Group::of)); + } + + DocumentId id() { + return new DocumentId("id:" + requireNonNull(path.get("namespace")) + + ":" + requireNonNull(path.get("documentType")) + + ":" + group.map(Group::docIdPart).orElse("") + + ":" + requireNonNull(path.get("docid"))); + } + + String documentType() { return requireNonNull(path.get("documentType")); } + String namespace() { return requireNonNull(path.get("namespace")); } + Optional<Group> group() { return group; } + + } + + private static Optional<String> getProperty(HttpRequest request, String name) { + List<String> values = request.parameters().get(name); + if (values != null && values.size() != 0) + return Optional.ofNullable(values.get(values.size() - 1)); + + return Optional.empty(); + } + + private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) { + return getProperty(request, name).map(parser::parse); + } + + + @FunctionalInterface + interface Parser<T> extends Function<String, T> { + default T parse(String value) { + try { + return apply(value); + } + catch (RuntimeException e) { + throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e)); + } + } + } + + + @FunctionalInterface + interface Handler { + ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler); + } + + + /** Readable content channel which forwards data to a reader when closed. */ + static class ForwardingContentChannel implements ContentChannel { + + private final ReadableContentChannel delegate = new ReadableContentChannel(); + private final Consumer<InputStream> reader; + + public ForwardingContentChannel(Consumer<InputStream> reader) { + this.reader = reader; + } + + @Override + public void write(ByteBuffer buf, CompletionHandler handler) { + delegate.write(buf, handler); + } + + @Override + public void close(CompletionHandler handler) { + delegate.close(handler); + try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) { + reader.accept(in); + } + } + + } + + + static class DocumentOperationParser { + + private static final JsonFactory jsonFactory = new JsonFactory(); + + private final DocumentTypeManager manager; + + DocumentOperationParser(DocumentmanagerConfig config) { + this.manager = new DocumentTypeManager(config); + } + + DocumentPut parsePut(InputStream inputStream, String docId) { + return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT); + } + + DocumentUpdate parseUpdate(InputStream inputStream, String docId) { + return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE); + } + + private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) { + return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId); + } + + } + + private class MeasuringResponseHandler implements ResponseHandler { + + private final ResponseHandler delegate; + private final DocumentOperationType type; + private final Instant start; + + private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) { + this.delegate = delegate; + this.type = type; + this.start = start; + } + + @Override + public ContentChannel handleResponse(Response response) { + switch (response.getStatus() / 100) { + case 2: metrics.reportSuccessful(type, start); break; + case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break; + case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break; + } + return delegate.handleResponse(response); + } + + } + +} diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def new file mode 100644 index 00000000000..19f4f50648b --- /dev/null +++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def @@ -0,0 +1,15 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package=com.yahoo.document.restapi + +# Delay before a throttled operation is retried. +resendDelayMillis int default=100 + +# Time between a document operation is received and a timeout response is sent +defaultTimeoutSeconds int default=180 + +# Time after which a visitor session times out +visitTimeoutSeconds int default=120 + +# Bound on number of document operations to keep in retry queue — further operations are rejected +maxThrottled int default=200 + diff --git a/vespaclient-container-plugin/src/test/cfg/music.sd b/vespaclient-container-plugin/src/test/cfg/music.sd new file mode 100644 index 00000000000..a289f5a686b --- /dev/null +++ b/vespaclient-container-plugin/src/test/cfg/music.sd @@ -0,0 +1,6 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +search music { + document music { + field artist type string { } + } +} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java new file mode 100644 index 00000000000..3d350adab87 --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java @@ -0,0 +1,85 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.document.DocumentGet; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.documentapi.DocumentOperationParameters; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author jonmv + */ +public class DocumentOperationExecutorMock implements DocumentOperationExecutor { + + final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>(); + final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>(); + final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>(); + final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>(); + final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>(); + + @Override + public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(new DocumentGet(id), parameters, context); + } + + @Override + public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(put, parameters, context); + } + + @Override + public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(update, parameters, context); + } + + @Override + public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(new DocumentRemove(id), parameters, context); + } + + @Override + public void visit(VisitorOptions options, VisitOperationsContext context) { + lastOptions.set(options); + lastVisitContext.set(context); + } + + @Override + public String routeToCluster(String cluster) { + if ("throw-me".equals(cluster)) + throw new IllegalArgumentException(cluster); + + return "route-to-" + cluster; + } + + public DocumentOperation lastOperation() { + return lastOperation.get(); + } + + public DocumentOperationParameters lastParameters() { + return lastParameters.get(); + } + + public OperationContext lastOperationContext() { + return lastOperationContext.get(); + } + + public VisitorOptions lastOptions() { + return lastOptions.get(); + } + + public VisitOperationsContext lastVisitContext() { + return lastVisitContext.get(); + } + + private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) { + lastOperation.set(operation); + lastParameters.set(parameters); + lastOperationContext.set(context); + } + +} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java new file mode 100644 index 00000000000..1d2f6af35dd --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java @@ -0,0 +1,406 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.application.container.DocumentAccesses; +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType; +import com.yahoo.document.restapi.DocumentOperationExecutor.Group; +import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; +import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; +import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.local.LocalAsyncSession; +import com.yahoo.documentapi.local.LocalDocumentAccess; +import com.yahoo.test.ManualClock; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * This test uses a config definition for the "music" document type, which has a single string field "artist". + * One cluster named "content" exists, and can be reached through the "route" route for "music" documents. + * + * @author jonmv + */ +public class DocumentOperationExecutorTest { + + final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder() + .cluster("content", + new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", + new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace(FixedBucketSpaces.defaultSpace()))) + .build(); + final ClusterListConfig clusterConfig = new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder().configid("config-id") + .name("content")) + .build(); + final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder() + .resendDelayMillis(10) + .defaultTimeoutSeconds(1) + .maxThrottled(2) + .build(); + final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", + "config-id", + Map.of("music", "route"))); + final List<Document> received = new ArrayList<>(); + final List<ErrorType> errors = new ArrayList<>(); + final List<String> messages = new ArrayList<>(); + final List<String> tokens = new ArrayList<>(); + ManualClock clock; + LocalDocumentAccess access; + DocumentOperationExecutorImpl executor; + DocumentType musicType; + Document doc1; + Document doc2; + Document doc3; + + OperationContext operationContext() { + return new OperationContext((type, error) -> { errors.add(type); messages.add(error); }, + document -> document.ifPresent(received::add)); + } + + VisitOperationsContext visitContext() { + return new VisitOperationsContext((type, error) -> { errors.add(type); messages.add(error); }, + token -> token.ifPresent(tokens::add), + received::add); + } + + LocalAsyncSession session() { + return (LocalAsyncSession) executor.asyncSession(); + } + + @Before + public void setUp() { + clock = new ManualClock(); + access = DocumentAccesses.createFromSchemas("src/test/cfg"); + executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock); + received.clear(); + errors.clear(); + tokens.clear(); + + musicType = access.getDocumentTypeManager().getDocumentType("music"); + doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one"); + doc2 = new Document(musicType, "id:ns:music:n=1:2"); doc2.setFieldValue("artist", "two"); + doc3 = new Document(musicType, "id:ns:music:g=a:3"); + } + + @After + public void tearDown() { + access.shutdown(); + } + + @Test + public void testResolveCluster() { + assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", + executor.routeToCluster("content")); + try { + executor.routeToCluster("blargh"); + fail("Should not find this cluster"); + } + catch (IllegalArgumentException e) { + assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage()); + } + assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name()); + try { + DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of()); + fail("No clusters should fail"); + } + catch (IllegalArgumentException e) { + assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage()); + } + try { + Map<String, StorageCluster> twoClusters = new TreeMap<>(); + twoClusters.put("one", new StorageCluster("one", "one-config", Map.of())); + twoClusters.put("two", new StorageCluster("two", "two-config", Map.of())); + DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), twoClusters); + fail("More than one cluster and no document type should fail"); + } + catch (IllegalArgumentException e) { + assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage()); + } + } + + @Test + public void testThrottling() throws InterruptedException { + executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts. + // Put documents 1 and 2 into backend. + executor.put(new DocumentPut(doc1), parameters(), operationContext()); + executor.put(new DocumentPut(doc2), parameters(), operationContext()); + assertEquals(List.of(doc1, doc2), received); + + session().setResultType(Result.ResultType.TRANSIENT_ERROR); + + // First two are put on retry queue. + executor.get(doc1.getId(), parameters(), operationContext()); + executor.get(doc2.getId(), parameters(), operationContext()); + assertEquals(List.of(), errors); + + // Third operation is rejected. + executor.get(doc3.getId(), parameters(), operationContext()); + assertEquals(List.of(OVERLOAD), errors); + + // Maintainer does not yet run. + executor.notifyMaintainers(); + // Third operation is rejected again. + executor.get(doc3.getId(), parameters(), operationContext()); + assertEquals(List.of(OVERLOAD, OVERLOAD), errors); + + // Maintainer retries documents, but they're put back into the queue with a new delay. + clock.advance(Duration.ofMillis(20)); + executor.notifyMaintainers(); + assertEquals(List.of(OVERLOAD, OVERLOAD), errors); + + session().setResultType(Result.ResultType.SUCCESS); + // Maintainer retries documents again, this time successfully. + clock.advance(Duration.ofMillis(20)); + executor.notifyMaintainers(); + assertEquals(List.of(OVERLOAD, OVERLOAD), errors); + assertEquals(List.of(doc1, doc2, doc1, doc2), received); + } + + @Test + public void testTimeout() throws InterruptedException { + Phaser phaser = new Phaser(1); + access.setPhaser(phaser); + executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts. + + // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms + executor.put(new DocumentPut(doc1), parameters(), operationContext()); + clock.advance(Duration.ofMillis(20)); + executor.put(new DocumentPut(doc2), parameters(), operationContext()); + executor.notifyMaintainers(); + assertEquals(List.of(), errors); + assertEquals(List.of(), received); + + clock.advance(Duration.ofMillis(990)); + executor.notifyMaintainers(); // Let doc1 time out. + phaser.arriveAndAwaitAdvance(); // Let doc2 arrive. + phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered. + assertEquals(List.of(TIMEOUT), errors); + assertEquals(List.of(doc2), received); + + session().setResultType(Result.ResultType.TRANSIENT_ERROR); + executor.put(new DocumentPut(doc3), parameters(), operationContext()); + clock.advance(Duration.ofMillis(990)); + executor.notifyMaintainers(); // Retry throttled operation. + clock.advance(Duration.ofMillis(20)); + executor.notifyMaintainers(); // Time out throttled operation. + assertEquals(List.of(TIMEOUT, TIMEOUT), errors); + assertEquals(List.of(doc2), received); + + session().setResultType(Result.ResultType.SUCCESS); + clock.advance(Duration.ofMillis(20)); + executor.notifyMaintainers(); // Retry not attempted since operation already timed out. + phaser.arriveAndAwaitAdvance(); + phaser.arriveAndAwaitAdvance(); + assertEquals(List.of(TIMEOUT, TIMEOUT), errors); + assertEquals(List.of(doc2), received); + } + + @Test + public void testCallback() { + AtomicBoolean called = new AtomicBoolean(); + executor.get(doc1.getId(), parameters().withResponseHandler(__ -> called.set(true)), operationContext()); + assertTrue(called.get()); + assertEquals(List.of(), messages); + assertEquals(List.of(), errors); + assertEquals(List.of(), received); + } + + @Test + public void testVisit() throws InterruptedException { + executor.put(new DocumentPut(doc1), parameters(), operationContext()); + executor.put(new DocumentPut(doc2), parameters(), operationContext()); + executor.put(new DocumentPut(doc3), parameters(), operationContext()); + assertEquals(doc1, received.remove(0)); + assertEquals(doc2, received.remove(0)); + assertEquals(doc3, received.remove(0)); + + // No cluster or document type set. + executor.visit(VisitorOptions.builder() + .build(), + visitContext()); + assertEquals("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level", messages.remove(0)); + assertEquals(BAD_REQUEST, errors.remove(0)); + assertEquals(List.of(), received); + + // Cluster not found. + executor.visit(VisitorOptions.builder() + .cluster("blargh") + .build(), + visitContext()); + assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", messages.remove(0)); + assertEquals(BAD_REQUEST, errors.remove(0)); + assertEquals(List.of(), received); + + // Matches doc2 for user 1. + executor.visit(VisitorOptions.builder() + .cluster("content") + .group(Group.of(1)) + .build(), + visitContext()); + for (VisitorControlHandler session : executor.visitorSessions()) { + session.waitUntilDone(); + } + assertEquals(List.of(), messages); + assertEquals(List.of(), errors); + assertEquals(doc2, received.remove(0)); + + // Matches documents in namespace ns of type music in group a. + executor.visit(VisitorOptions.builder() + .concurrency(2) + .wantedDocumentCount(3) + .namespace("ns") + .documentType("music") + .fieldSet("music:artist") + .group(Group.of("a")) + .build(), + visitContext()); + for (VisitorControlHandler session : executor.visitorSessions()) + session.waitUntilDone(); + assertEquals(List.of(), messages); + assertEquals(List.of(), errors); + assertEquals(doc3, received.remove(0)); + + // Matches documents with non-empty artist field. + executor.visit(VisitorOptions.builder() + .cluster("content") + .selection("music.artist") + .fieldSet("[id]") + .build(), + visitContext()); + for (VisitorControlHandler session : executor.visitorSessions()) + session.waitUntilDone(); + assertEquals(List.of(), messages); + assertEquals(List.of(), errors); + assertEquals(List.of(doc1.getId(), doc2.getId()), List.of(received.remove(0).getId(), received.remove(0).getId())); + + // Matches all documents, but we'll shut down midway. + Phaser phaser = new Phaser(1); + access.setPhaser(phaser); + executor.visit(VisitorOptions.builder() + .cluster("content") + .bucketSpace("global") + .build(), + visitContext()); + phaser.arriveAndAwaitAdvance(); // First document pending + CountDownLatch latch = new CountDownLatch(1); + Thread shutdownThread = new Thread(() -> { + executor.shutdown(); + latch.countDown(); + }); + shutdownThread.start(); + clock.advance(Duration.ofMillis(100)); + executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly. + latch.await(); // Make sure visit session is shut down before next document is considered. + phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above. + for (VisitorControlHandler session : executor.visitorSessions()) { + session.waitUntilDone(); + } + assertEquals(List.of(), messages); + assertEquals(List.of(), errors); + assertEquals(List.of(doc1), received); + } + + @Test + public void testDelayQueue() throws ExecutionException, InterruptedException, TimeoutException { + Supplier<Result> nullOperation = () -> null; + AtomicLong counter1 = new AtomicLong(0); + AtomicLong counter2 = new AtomicLong(0); + AtomicLong counter3 = new AtomicLong(0); + AtomicBoolean throttle = new AtomicBoolean(true); + OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet()); + OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet()); + OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet()); + DelayQueue queue = new DelayQueue(3, + (operation, context) -> { + if (throttle.get()) + return false; + + context.success(Optional.empty()); + return true; + }, + Duration.ofMillis(30), + clock, + "test"); + synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts. + + // Add three operations: + // the first shall be handled by the queue on second attempt, + // the second by an external call,and + // the third during shutdown — added later. + assertTrue(queue.add(nullOperation, context1)); + clock.advance(Duration.ofMillis(20)); + assertTrue(queue.add(nullOperation, context2)); + assertTrue(queue.add(nullOperation, context3)); + assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3)); + assertEquals(3, queue.size()); + assertEquals(0, counter1.get()); + assertEquals(0, counter2.get()); + assertEquals(0, counter3.get()); + + context2.error(ERROR, "error"); // Marks this as handled, ready to be evicted. + synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer does not run yet, as it's not yet time. + assertEquals(0, counter1.get()); + assertEquals(-1, counter2.get()); + assertEquals(0, counter3.get()); + assertEquals(3, queue.size()); + + clock.advance(Duration.ofMillis(15)); + synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry. + assertEquals(0, counter1.get()); + assertEquals(-1, counter2.get()); + assertEquals(0, counter3.get()); + assertEquals(2, queue.size()); + + throttle.set(false); + clock.advance(Duration.ofMillis(15)); + synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry. + assertEquals(1, counter1.get()); + assertEquals(-1, counter2.get()); + assertEquals(0, counter3.get()); + assertEquals(1, queue.size()); + + queue.shutdown(Duration.ZERO, context -> context.error(ERROR, "shutdown")) + .get(1, TimeUnit.SECONDS); + assertEquals(1, counter1.get()); + assertEquals(-1, counter2.get()); + assertEquals(-1, counter3.get()); + assertEquals(0, queue.size()); + } + +} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java new file mode 100644 index 00000000000..9554fa0a913 --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -0,0 +1,373 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi.resource; + +import com.yahoo.container.jdisc.RequestHandlerTestDriver; +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentGet; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.restapi.DocumentOperationExecutor.Group; +import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; +import com.yahoo.document.restapi.DocumentOperationExecutorMock; +import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.DocumentOperationParser; +import com.yahoo.document.update.FieldUpdate; +import com.yahoo.documentapi.DocumentAccessParams; +import com.yahoo.documentapi.local.LocalDocumentAccess; +import com.yahoo.jdisc.Metric; +import com.yahoo.metrics.simple.MetricReceiver; +import com.yahoo.searchdefinition.derived.Deriver; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.JsonFormat; +import com.yahoo.slime.SlimeUtils; +import com.yahoo.test.ManualClock; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Optional; + +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; +import static com.yahoo.documentapi.DocumentOperationParameters.parameters; +import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; +import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS; +import static com.yahoo.jdisc.http.HttpRequest.Method.PATCH; +import static com.yahoo.jdisc.http.HttpRequest.Method.POST; +import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author jonmv + */ +public class DocumentV1ApiTest { + + final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build(); + final DocumentTypeManager manager = new DocumentTypeManager(docConfig); + final Document doc1 = new Document(manager.getDocumentType("music"), "id:space:music::one"); + final Document doc2 = new Document(manager.getDocumentType("music"), "id:space:music:n=1:two"); + final Document doc3 = new Document(manager.getDocumentType("music"), "id:space:music:g=a:three"); + { + doc1.setFieldValue("artist", "Tom Waits"); + doc2.setFieldValue("artist", "Asa-Chan & Jun-Ray"); + } + + ManualClock clock; + DocumentOperationParser parser; + LocalDocumentAccess access; + DocumentOperationExecutorMock executor; + Metric metric; + MetricReceiver metrics; + DocumentV1ApiHandler handler; + + @Before + public void setUp() { + clock = new ManualClock(); + parser = new DocumentOperationParser(docConfig); + access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(docConfig)); + executor = new DocumentOperationExecutorMock(); + metric = new NullMetric(); + metrics = new MetricReceiver.MockReceiver(); + handler = new DocumentV1ApiHandler(clock, executor, parser, metric, metrics); + } + + @After + public void tearDown() { + handler.destroy(); + } + + @Test + public void testResponses() { + RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + // GET at non-existent path returns 404 with available paths + var response = driver.sendRequest("http://localhost/document/v1/not-found"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/not-found\"," + + " \"message\": \"Nothing at '/document/v1/not-found'. Available paths are:\\n" + + "/document/v1/\\n" + + "/document/v1/{namespace}/{documentType}/docid/\\n" + + "/document/v1/{namespace}/{documentType}/group/{group}/\\n" + + "/document/v1/{namespace}/{documentType}/number/{number}/\\n" + + "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" + + "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" + + "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" + + "}", + response.readAll()); + assertEquals("application/json; charset=UTF-8", response.getResponse().headers().getFirst("Content-Type")); + assertEquals(404, response.getStatus()); + + // GET at root is a visit. Numeric parameters have an upper bound. + response = driver.sendRequest("http://localhost/document/v1?cluster=lackluster&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + + "&selection=all%20the%20things&fieldSet=[id]&continuation=token"); + executor.lastVisitContext().document(doc1); + executor.lastVisitContext().document(doc2); + executor.lastVisitContext().document(doc3); + executor.lastVisitContext().success(Optional.of("token")); + assertSameJson("{" + + " \"pathId\": \"/document/v1\"," + + " \"documents\": [" + + " {" + + " \"id\": \"id:space:music::one\"," + + " \"fields\": {" + + " \"artist\": \"Tom Waits\"" + + " }" + + " }," + + " {" + + " \"id\": \"id:space:music:n=1:two\"," + + " \"fields\": {" + + " \"artist\": \"Asa-Chan & Jun-Ray\"" + + " }" + + " }," + + " {" + + " \"id\": \"id:space:music:g=a:three\"," + + " \"fields\": {}" + + " }" + + " ]," + + " \"continuation\": \"token\"" + + "}", + response.readAll()); + assertEquals(200, response.getStatus()); + assertEquals(VisitorOptions.builder().cluster("lackluster").bucketSpace("default").wantedDocumentCount(1024) + .concurrency(100).selection("all the things").fieldSet("[id]").continuation("token").build(), + executor.lastOptions()); + + // GET with namespace and document type is a restricted visit. + response = driver.sendRequest("http://localhost/document/v1/space/music/docid"); + executor.lastVisitContext().error(BAD_REQUEST, "nope"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"documents\": []," + + " \"message\": \"nope\"" + + "}", + response.readAll()); + assertEquals(400, response.getStatus()); + assertEquals(VisitorOptions.builder().namespace("space").documentType("music").build(), + executor.lastOptions()); + + // GET with namespace, document type and group is a restricted visit. + response = driver.sendRequest("http://localhost/document/v1/space/music/group/best"); + executor.lastVisitContext().error(ERROR, "error"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/group/best\"," + + " \"documents\": []," + + " \"message\": \"error\"" + + "}", + response.readAll()); + assertEquals(500, response.getStatus()); + assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of("best")).build(), + executor.lastOptions()); + + // GET with namespace, document type and number is a restricted visit. + response = driver.sendRequest("http://localhost/document/v1/space/music/number/123"); + executor.lastVisitContext().success(Optional.empty()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/123\"," + + " \"documents\": []" + + "}", + response.readAll()); + assertEquals(200, response.getStatus()); + assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of(123)).build(), + executor.lastOptions()); + + // GET with full document ID is a document get operation which returns 404 when no document is found + response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=lackluster&fieldSet=go"); + executor.lastOperationContext().success(Optional.empty()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"id\": \"id:space:music::one\"" + + "}", + response.readAll()); + assertEquals(404, response.getStatus()); + assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation()); + assertEquals(parameters().withRoute("route-to-lackluster").withFieldSet("go"), executor.lastParameters()); + + // GET with full document ID is a document get operation. + response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?"); + executor.lastOperationContext().success(Optional.of(doc1)); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"id\": \"id:space:music::one\"," + + " \"fields\": {" + + " \"artist\": \"Tom Waits\"" + + " }" + + "}", + response.readAll()); + assertEquals(200, response.getStatus()); + assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation()); + assertEquals(parameters(), executor.lastParameters()); + + // GET with not encoded / in user specified part of document id is a 404 + response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one/two/three"); + response.readAll(); // Must drain body. + assertEquals(404, response.getStatus()); + + // POST with a document payload is a document put operation. + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST, + "{" + + " \"fields\": {" + + " \"artist\": \"Asa-Chan & Jun-Ray\"" + + " }" + + "}"); + executor.lastOperationContext().success(Optional.empty()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"id\": \"id:space:music:n=1:two\"" + + "}", + response.readAll()); + assertEquals(200, response.getStatus()); + DocumentPut put = new DocumentPut(doc2); + put.setCondition(new TestAndSetCondition("test it")); + assertEquals(put, executor.lastOperation()); + assertEquals(parameters(), executor.lastParameters()); + + // PUT with a document update payload is a document update operation. + response = driver.sendRequest("http://localhost/document/v1/space/music/group/a/three?create=true", PUT, + "{" + + " \"fields\": {" + + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + + " }" + + "}"); + executor.lastOperationContext().success(Optional.empty()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/group/a/three\"," + + " \"id\": \"id:space:music:g=a:three\"" + + "}", + response.readAll()); + DocumentUpdate update = new DocumentUpdate(doc3.getDataType(), doc3.getId()); + update.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl"))); + update.setCreateIfNonExistent(true); + assertEquals(update, executor.lastOperation()); + assertEquals(parameters(), executor.lastParameters()); + assertEquals(200, response.getStatus()); + + // POST with illegal payload is a 400 + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST, + "{" + + " ┻━┻︵ \\(°□°)/ ︵ ┻━┻" + + "}"); + Inspector responseRoot = SlimeUtils.jsonToSlime(response.readAll()).get(); + assertEquals("/document/v1/space/music/number/1/two", responseRoot.field("pathId").asString()); + assertTrue(responseRoot.field("message").asString().startsWith("Unexpected character ('┻' (code 9531 / 0x253b)): was expecting double-quote to start field name")); + assertEquals(400, response.getStatus()); + + // PUT on a unknown document type is a 400 + response = driver.sendRequest("http://localhost/document/v1/space/house/group/a/three?create=true", PUT, + "{" + + " \"fields\": {" + + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + + " }" + + "}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/house/group/a/three\"," + + " \"message\": \"Document type house does not exist\"" + + "}", + response.readAll()); + assertEquals(400, response.getStatus()); + + // DELETE with full document ID is a document remove operation. + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route", DELETE); + executor.lastOperationContext().success(Optional.empty()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"id\": \"id:space:music:n=1:two\"" + + "}", + response.readAll()); + assertEquals(200, response.getStatus()); + assertEquals(new DocumentRemove(doc2.getId()), executor.lastOperation()); + assertEquals(parameters().withRoute("route"), executor.lastParameters()); + + // GET with non-existent cluster is a 400 + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?cluster=throw-me"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"throw-me\"" + + "}", + response.readAll()); + assertEquals(400, response.getStatus()); + + // TIMEOUT is a 504 + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + executor.lastOperationContext().error(TIMEOUT, "timeout"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"id\": \"id:space:music:n=1:two\"," + + " \"message\": \"timeout\"" + + "}", + response.readAll()); + assertEquals(504, response.getStatus()); + + // OVERLOAD is a 429 + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + executor.lastOperationContext().error(OVERLOAD, "overload"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"id\": \"id:space:music:n=1:two\"," + + " \"message\": \"overload\"" + + "}", + response.readAll()); + assertEquals(429, response.getStatus()); + + // PRECONDITION_FAILED is a 412 + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + executor.lastOperationContext().error(PRECONDITION_FAILED, "no dice"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"id\": \"id:space:music:n=1:two\"," + + " \"message\": \"no dice\"" + + "}", + response.readAll()); + assertEquals(412, response.getStatus()); + + // Client close during processing gives empty body + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + response.clientClose(); + executor.lastOperationContext().error(TIMEOUT, "no dice"); + assertEquals("", response.readAll()); + assertEquals(504, response.getStatus()); + + // OPTIONS gets options + response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", OPTIONS); + assertEquals("", response.readAll()); + assertEquals(204, response.getStatus()); + assertEquals("GET,POST,PUT,DELETE", response.getResponse().headers().getFirst("Allow")); + + // PATCH is not allowed + response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", PATCH); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"message\": \"'PATCH' not allowed at '/document/v1/space/music/docid/one'. Allowed methods are: GET, POST, PUT, DELETE\"" + + "}", + response.readAll()); + assertEquals(405, response.getStatus()); + + driver.close(); + } + + void assertSameJson(String expected, String actual) { + ByteArrayOutputStream expectedPretty = new ByteArrayOutputStream(); + ByteArrayOutputStream actualPretty = new ByteArrayOutputStream(); + JsonFormat formatter = new JsonFormat(false); + try { + formatter.encode(actualPretty, SlimeUtils.jsonToSlimeOrThrow(actual)); + formatter.encode(expectedPretty, SlimeUtils.jsonToSlimeOrThrow(expected)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + assertEquals(expectedPretty.toString(UTF_8), actualPretty.toString(UTF_8)); + } + +} |