summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--application/abi-spec.json11
-rw-r--r--application/src/main/java/com/yahoo/application/container/DocumentAccesses.java44
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java54
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java2
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java32
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java7
-rw-r--r--document/abi-spec.json4
-rw-r--r--document/src/main/java/com/yahoo/document/DocumentPut.java16
-rw-r--r--document/src/main/java/com/yahoo/document/TestAndSetCondition.java14
-rw-r--r--document/src/main/java/com/yahoo/document/json/TokenBuffer.java4
-rw-r--r--documentapi/abi-spec.json31
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java50
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java162
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java55
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java24
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java27
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
-rw-r--r--jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java22
-rw-r--r--testutil/src/main/java/com/yahoo/test/ManualClock.java13
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java2
-rw-r--r--vespaclient-container-plugin/pom.xml6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java310
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java496
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java620
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def15
-rw-r--r--vespaclient-container-plugin/src/test/cfg/music.sd6
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java85
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java406
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java373
29 files changed, 2792 insertions, 165 deletions
diff --git a/application/abi-spec.json b/application/abi-spec.json
index d3e7ab6daef..690facffae7 100644
--- a/application/abi-spec.json
+++ b/application/abi-spec.json
@@ -271,6 +271,17 @@
"public static final enum com.yahoo.application.Networking disable"
]
},
+ "com.yahoo.application.container.DocumentAccesses": {
+ "superClass": "java.lang.Object",
+ "interfaces": [],
+ "attributes": [
+ "public"
+ ],
+ "methods": [
+ "public static com.yahoo.documentapi.local.LocalDocumentAccess createFromSchemas(java.lang.String)"
+ ],
+ "fields": []
+ },
"com.yahoo.application.container.DocumentProcessing": {
"superClass": "java.lang.Object",
"interfaces": [],
diff --git a/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java b/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java
new file mode 100644
index 00000000000..c0edad2baa6
--- /dev/null
+++ b/application/src/main/java/com/yahoo/application/container/DocumentAccesses.java
@@ -0,0 +1,44 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.application.container;
+
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentAccessParams;
+import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.searchdefinition.derived.Deriver;
+
+import java.io.File;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Utility for working with a {@link LocalDocumentAccess} for unit testing components which require a {@link DocumentAccess}.
+ *
+ * @author jonmv
+ */
+public class DocumentAccesses {
+
+ private DocumentAccesses() { }
+
+ /**
+ * Reads the {@code .sd} files in the given directory, and returns a {@link LocalDocumentAccess} with these document types.
+ * <br>
+ * Example usage:
+ * <pre>
+ * LocalDocumentAccess access = DocumentAccesses.ofSchemas("src/main/application/schemas");
+ * </pre>
+ */
+ public static LocalDocumentAccess createFromSchemas(String schemaDirectory) {
+ File[] schemasFiles = new File(schemaDirectory).listFiles(name -> name.toString().endsWith(".sd"));
+ if (schemasFiles == null)
+ throw new IllegalArgumentException(schemaDirectory + " is not a directory");
+ if (schemasFiles.length == 0)
+ throw new IllegalArgumentException("No schema files found under " + schemaDirectory);
+ DocumentmanagerConfig config = Deriver.getDocumentManagerConfig(Stream.of(schemasFiles)
+ .map(File::toString)
+ .collect(toList())).build();
+ return new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(config));
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
new file mode 100644
index 00000000000..cfb48339dbe
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
@@ -0,0 +1,54 @@
+package com.yahoo.container.core;
+
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.application.BindingMatch;
+import com.yahoo.jdisc.application.UriPattern;
+import com.yahoo.jdisc.handler.ResponseHandler;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Common HTTP request handler metrics code.
+ *
+ * @author jonmv
+ */
+public class HandlerMetricContextUtil {
+
+ public static void onHandle(Request request, Metric metric, Class<?> handlerClass) {
+ metric.add("handled.requests", 1, contextFor(request, metric, handlerClass));
+ }
+
+ public static void onHandled(Request request, Metric metric, Class<?> handlerClass) {
+ metric.set("handled.latency", request.timeElapsed(TimeUnit.MILLISECONDS), contextFor(request, metric, handlerClass));
+ }
+
+ public static Metric.Context contextFor(Request request, Metric metric, Class<?> handlerClass) {
+ return contextFor(request, Map.of(), metric, handlerClass);
+ }
+
+ public static Metric.Context contextFor(Request request, Map<String, String> extraDimensions, Metric metric, Class<?> handlerClass) {
+ BindingMatch<?> match = request.getBindingMatch();
+ if (match == null) return null;
+ UriPattern matched = match.matched();
+ if (matched == null) return null;
+ String name = matched.toString();
+ String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
+
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put("handler", name);
+ if (endpoint != null) {
+ dimensions.put("endpoint", endpoint);
+ }
+ URI uri = request.getUri();
+ dimensions.put("scheme", uri.getScheme());
+ dimensions.put("port", Integer.toString(uri.getPort()));
+ dimensions.put("handler-name", handlerClass.getName());
+ dimensions.putAll(extraDimensions);
+ return metric.createContext(dimensions);
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java b/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java
index 8243ad07760..a3e264c16ee 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/RequestHandlerTestDriver.java
@@ -127,7 +127,7 @@ public class RequestHandlerTestDriver implements AutoCloseable {
public String read() {
ByteBuffer nextBuffer = content.read();
if (nextBuffer == null) return null; // end of transmission
- return Charset.forName("utf-8").decode(nextBuffer).toString();
+ return StandardCharsets.UTF_8.decode(nextBuffer).toString();
}
/** Returns the number of bytes available in the handler right now */
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
index 9387e03e11b..323935e2a26 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
@@ -6,8 +6,6 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.BindingMatch;
-import com.yahoo.jdisc.application.UriPattern;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
import com.yahoo.jdisc.handler.BufferedContentChannel;
import com.yahoo.jdisc.handler.ContentChannel;
@@ -15,10 +13,9 @@ import com.yahoo.jdisc.handler.OverloadException;
import com.yahoo.jdisc.handler.ReadableContentChannel;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.container.core.HandlerMetricContextUtil;
-import java.net.URI;
import java.time.Duration;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@@ -79,29 +76,9 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
}
Metric.Context contextFor(Request request, Map<String, String> extraDimensions) {
- BindingMatch<?> match = request.getBindingMatch();
- if (match == null) return null;
- UriPattern matched = match.matched();
- if (matched == null) return null;
- String name = matched.toString();
- String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
-
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put("handler", name);
- if (endpoint != null) {
- dimensions.put("endpoint", endpoint);
- }
- URI uri = request.getUri();
- dimensions.put("scheme", uri.getScheme());
- dimensions.put("port", Integer.toString(uri.getPort()));
- String handlerClassName = getClass().getName();
- dimensions.put("handler-name", handlerClassName);
- dimensions.putAll(extraDimensions);
- return this.metric.createContext(dimensions);
+ return HandlerMetricContextUtil.contextFor(request, extraDimensions, metric, getClass());
}
- private Metric.Context contextFor(Request request) { return contextFor(request, Map.of()); }
-
/**
* Handles a request by assigning a worker thread to it.
*
@@ -109,7 +86,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
*/
@Override
public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
- metric.add("handled.requests", 1, contextFor(request));
+ HandlerMetricContextUtil.onHandle(request, metric, getClass());
if (request.getTimeout(TimeUnit.SECONDS) == null) {
Duration timeout = getTimeout();
if (timeout != null) {
@@ -212,8 +189,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
public ContentChannel handleResponse(Response response) {
if ( tryHasResponded()) throw new IllegalStateException("Response already handled");
ContentChannel cc = responseHandler.handleResponse(response);
- long millis = request.timeElapsed(TimeUnit.MILLISECONDS);
- metric.set("handled.latency", millis, contextFor(request));
+ HandlerMetricContextUtil.onHandled(request, metric, getClass());
return cc;
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
index f3b4d1f6457..407256b19c8 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
@@ -668,10 +668,9 @@ public class ControllerTest {
assertEquals(routingMethods, Set.of(RoutingMethod.shared, RoutingMethod.sharedLayer4));
// Deployment has stored application meta.
- assertArrayEquals(applicationPackage.metaDataZip(),
- tester.controllerTester().serviceRegistry().applicationStore()
- .getMeta(new DeploymentId(context.instanceId(), zone))
- .get(tester.clock().instant()));
+ assertNotNull(tester.controllerTester().serviceRegistry().applicationStore()
+ .getMeta(new DeploymentId(context.instanceId(), zone))
+ .get(tester.clock().instant()));
// Meta data tombstone placed on delete
tester.clock().advance(Duration.ofSeconds(1));
diff --git a/document/abi-spec.json b/document/abi-spec.json
index e53cf09f07e..c9191aa2fdb 100644
--- a/document/abi-spec.json
+++ b/document/abi-spec.json
@@ -399,6 +399,8 @@
"public com.yahoo.document.DocumentId getId()",
"public void <init>(com.yahoo.document.DocumentPut)",
"public void <init>(com.yahoo.document.DocumentPut, com.yahoo.document.Document)",
+ "public boolean equals(java.lang.Object)",
+ "public int hashCode()",
"public java.lang.String toString()"
],
"fields": []
@@ -1929,6 +1931,8 @@
"public java.lang.String getSelection()",
"public boolean isPresent()",
"public static com.yahoo.document.TestAndSetCondition fromConditionString(java.util.Optional)",
+ "public boolean equals(java.lang.Object)",
+ "public int hashCode()",
"public java.lang.String toString()"
],
"fields": [
diff --git a/document/src/main/java/com/yahoo/document/DocumentPut.java b/document/src/main/java/com/yahoo/document/DocumentPut.java
index c5ce2e7e181..e24388cd65f 100644
--- a/document/src/main/java/com/yahoo/document/DocumentPut.java
+++ b/document/src/main/java/com/yahoo/document/DocumentPut.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document;
+import java.util.Objects;
+
/**
* @author Vegard Sjonfjell
*/
@@ -47,6 +49,20 @@ public class DocumentPut extends DocumentOperation {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DocumentPut that = (DocumentPut) o;
+ return document.equals(that.document) &&
+ getCondition().equals(that.getCondition());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(document, getCondition());
+ }
+
+ @Override
public String toString() {
return "put of document " + getId();
}
diff --git a/document/src/main/java/com/yahoo/document/TestAndSetCondition.java b/document/src/main/java/com/yahoo/document/TestAndSetCondition.java
index 6a189fc2969..a582807e38c 100644
--- a/document/src/main/java/com/yahoo/document/TestAndSetCondition.java
+++ b/document/src/main/java/com/yahoo/document/TestAndSetCondition.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -43,6 +44,19 @@ public class TestAndSetCondition {
}
@Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TestAndSetCondition that = (TestAndSetCondition) o;
+ return conditionStr.equals(that.conditionStr);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(conditionStr);
+ }
+
+ @Override
public String toString() {
StringBuilder string = new StringBuilder();
string.append("condition '");
diff --git a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java
index e20845bfa54..88353139b0f 100644
--- a/document/src/main/java/com/yahoo/document/json/TokenBuffer.java
+++ b/document/src/main/java/com/yahoo/document/json/TokenBuffer.java
@@ -129,7 +129,7 @@ public class TokenBuffer {
add(t, tokens.getCurrentName(), tokens.getText());
} catch (IOException e) {
// TODO something sane
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
@@ -138,7 +138,7 @@ public class TokenBuffer {
return tokens.nextValue();
} catch (IOException e) {
// TODO something sane
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index f5f2a7c1845..a204da107f0 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -167,10 +167,15 @@
"public com.yahoo.documentapi.DocumentOperationParameters withFieldSet(java.lang.String)",
"public com.yahoo.documentapi.DocumentOperationParameters withRoute(java.lang.String)",
"public com.yahoo.documentapi.DocumentOperationParameters withTraceLevel(int)",
+ "public com.yahoo.documentapi.DocumentOperationParameters withResponseHandler(com.yahoo.documentapi.ResponseHandler)",
"public java.util.Optional priority()",
"public java.util.Optional fieldSet()",
"public java.util.Optional route()",
- "public java.util.OptionalInt traceLevel()"
+ "public java.util.OptionalInt traceLevel()",
+ "public java.util.Optional responseHandler()",
+ "public boolean equals(java.lang.Object)",
+ "public int hashCode()",
+ "public java.lang.String toString()"
],
"fields": []
},
@@ -969,17 +974,17 @@
"public void <init>(com.yahoo.documentapi.AsyncParameters, com.yahoo.documentapi.local.LocalDocumentAccess)",
"public double getCurrentWindowSize()",
"public com.yahoo.documentapi.Result put(com.yahoo.document.Document)",
- "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result put(com.yahoo.document.DocumentPut, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId)",
- "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, boolean, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
- "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result get(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId)",
- "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result remove(com.yahoo.document.DocumentId, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate)",
- "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
+ "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.DocumentOperationParameters)",
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
- "public void destroy()"
+ "public void destroy()",
+ "public void setResultType(com.yahoo.documentapi.Result$ResultType)"
],
"fields": []
},
@@ -991,12 +996,16 @@
],
"methods": [
"public void <init>(com.yahoo.documentapi.DocumentAccessParams)",
- "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
- "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
+ "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
"public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)",
"public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)"
+ "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)",
+ "public void setPhaser(java.util.concurrent.Phaser)",
+ "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)"
],
"fields": []
},
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
index 3258c2f5b2c..1d934680586 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/DocumentOperationParameters.java
@@ -5,6 +5,7 @@ import com.yahoo.document.fieldset.FieldSet;
import com.yahoo.document.fieldset.FieldSetRepo;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
@@ -17,18 +18,21 @@ import static java.util.Objects.requireNonNull;
*/
public class DocumentOperationParameters {
- private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1);
+ private static final DocumentOperationParameters empty = new DocumentOperationParameters(null, null, null, -1, null);
private final DocumentProtocol.Priority priority;
private final String fieldSet;
private final String route;
private final int traceLevel;
+ private final ResponseHandler responseHandler;
- private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route, int traceLevel) {
+ private DocumentOperationParameters(DocumentProtocol.Priority priority, String fieldSet, String route,
+ int traceLevel, ResponseHandler responseHandler) {
this.priority = priority;
this.fieldSet = fieldSet;
this.route = route;
this.traceLevel = traceLevel;
+ this.responseHandler = responseHandler;
}
public static DocumentOperationParameters parameters() {
@@ -37,22 +41,22 @@ public class DocumentOperationParameters {
/** Sets the priority with which to perform an operation. */
public DocumentOperationParameters withPriority(DocumentProtocol.Priority priority) {
- return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel);
+ return new DocumentOperationParameters(requireNonNull(priority), fieldSet, route, traceLevel, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(FieldSet fieldSet) {
- return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel);
+ return new DocumentOperationParameters(priority, new FieldSetRepo().serialize(fieldSet), route, traceLevel, responseHandler);
}
/** Sets the field set used for retrieval. */
public DocumentOperationParameters withFieldSet(String fieldSet) {
- return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel);
+ return new DocumentOperationParameters(priority, requireNonNull(fieldSet), route, traceLevel, responseHandler);
}
/** Sets the route along which to send the operation. */
public DocumentOperationParameters withRoute(String route) {
- return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel);
+ return new DocumentOperationParameters(priority, fieldSet, requireNonNull(route), traceLevel, responseHandler);
}
/** Sets the trace level for an operation. */
@@ -60,12 +64,44 @@ public class DocumentOperationParameters {
if (traceLevel < 0 || traceLevel > 9)
throw new IllegalArgumentException("Trace level must be from 0 (no tracing) to 9 (maximum)");
- return new DocumentOperationParameters(priority, fieldSet, route, traceLevel);
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, responseHandler);
+ }
+
+ /** Sets the {@link ResponseHandler} to handle the {@link Response} of an async operation, instead of the session default. */
+ public DocumentOperationParameters withResponseHandler(ResponseHandler responseHandler) {
+ return new DocumentOperationParameters(priority, fieldSet, route, traceLevel, requireNonNull(responseHandler));
}
public Optional<DocumentProtocol.Priority> priority() { return Optional.ofNullable(priority); }
public Optional<String> fieldSet() { return Optional.ofNullable(fieldSet); }
public Optional<String> route() { return Optional.ofNullable(route); }
public OptionalInt traceLevel() { return traceLevel >= 0 ? OptionalInt.of(traceLevel) : OptionalInt.empty(); }
+ public Optional<ResponseHandler> responseHandler() { return Optional.ofNullable(responseHandler); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DocumentOperationParameters that = (DocumentOperationParameters) o;
+ return traceLevel == that.traceLevel &&
+ priority == that.priority &&
+ Objects.equals(fieldSet, that.fieldSet) &&
+ Objects.equals(route, that.route);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(priority, fieldSet, route, traceLevel);
+ }
+
+ @Override
+ public String toString() {
+ return "DocumentOperationParameters{" +
+ "priority=" + priority +
+ ", fieldSet='" + fieldSet + '\'' +
+ ", route='" + route + '\'' +
+ ", traceLevel=" + traceLevel +
+ '}';
+ }
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 40f26a82a89..8781e4a3a51 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -9,6 +9,7 @@ import com.yahoo.document.DocumentUpdate;
import com.yahoo.documentapi.AsyncParameters;
import com.yahoo.documentapi.AsyncSession;
import com.yahoo.documentapi.DocumentIdResponse;
+import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.DocumentResponse;
import com.yahoo.documentapi.DocumentUpdateResponse;
import com.yahoo.documentapi.RemoveResponse;
@@ -18,32 +19,39 @@ import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.SyncParameters;
import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
/**
* @author bratseth
+ * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final List<Response> responses = new LinkedList<>();
+ private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private long requestId = 0;
- private Random random = new Random();
+ private final Executor executor = Executors.newCachedThreadPool();
+ private final AtomicReference<Phaser> phaser;
- private synchronized long getNextRequestId() {
- requestId++;
- return requestId;
- }
+ private AtomicLong requestId = new AtomicLong(0);
+ private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- random.setSeed(System.currentTimeMillis());
- syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.syncSession = access.createSyncSession(new SyncParameters.Builder().build());
+ this.phaser = access.phaser;
}
@Override
@@ -53,87 +61,85 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(Document document) {
- return put(new DocumentPut(document), DocumentProtocol.Priority.NORMAL_3);
+ return put(new DocumentPut(document), parameters());
}
@Override
- public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- syncSession.put(documentPut, pri);
- addResponse(new DocumentResponse(req, documentPut.getDocument()));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) {
+ return send(req -> {
+ try {
+ syncSession.put(documentPut, parameters);
+ return new DocumentResponse(req, documentPut.getDocument());
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
+ }
+ },
+ parameters);
}
@Override
public Result get(DocumentId id) {
- return get(id, false, DocumentProtocol.Priority.NORMAL_3);
+ return get(id, parameters());
}
@Override
- @Deprecated // TODO: Remove on Vespa 8
- public Result get(DocumentId id, boolean headersOnly, DocumentProtocol.Priority pri) {
- return get(id, pri);
- }
-
- @Override
- public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- addResponse(new DocumentResponse(req, syncSession.get(id)));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ public Result get(DocumentId id, DocumentOperationParameters parameters) {
+ return send(req -> {
+ try {
+ return new DocumentResponse(req, syncSession.get(id, parameters, null));
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
+ }
+ },
+ parameters);
}
@Override
public Result remove(DocumentId id) {
- return remove(id, DocumentProtocol.Priority.NORMAL_3);
+ return remove(id, parameters());
}
@Override
- public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- addResponse(new RemoveResponse(req, true));
- } else {
- addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ public Result remove(DocumentId id, DocumentOperationParameters parameters) {
+ return send(req -> {
+ if (syncSession.remove(new DocumentRemove(id), parameters)) {
+ return new RemoveResponse(req, true);
+ }
+ else {
+ return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ },
+ parameters);
}
@Override
public Result update(DocumentUpdate update) {
- return update(update, DocumentProtocol.Priority.NORMAL_3);
+ return update(update, parameters());
}
@Override
- public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.update(update, pri)) {
- addResponse(new UpdateResponse(req, true));
- } else {
- addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ public Result update(DocumentUpdate update, DocumentOperationParameters parameters) {
+ return send(req -> {
+ if (syncSession.update(update, parameters)) {
+ return new UpdateResponse(req, true);
+ }
+ else {
+ return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ },
+ parameters);
}
@Override
public Response getNext() {
- if (responses.isEmpty()) {
- return null;
- }
- int index = random.nextInt(responses.size());
- return responses.remove(index);
+ return responses.poll();
}
@Override
- public Response getNext(int timeout) {
- return getNext();
+ public Response getNext(int timeoutMilliseconds) throws InterruptedException {
+ return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
@Override
@@ -141,6 +147,11 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Responses to appear. */
+ public void setResultType(Result.ResultType resultType) {
+ this.result.set(resultType);
+ }
+
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -149,4 +160,29 @@ public class LocalAsyncSession implements AsyncSession {
}
}
+ private Result send(Function<Long, Response> responses, DocumentOperationParameters parameters) {
+ Result.ResultType resultType = result.get();
+ if (resultType != SUCCESS)
+ return new Result(resultType, new Error());
+
+ ResponseHandler responseHandler = parameters.responseHandler().orElse(this::addResponse);
+ long req = requestId.incrementAndGet();
+ Phaser synchronizer = phaser.get();
+ if (synchronizer == null)
+ responseHandler.handleResponse(responses.apply(req));
+ else {
+ synchronizer.register();
+ executor.execute(() -> {
+ try {
+ synchronizer.arriveAndAwaitAdvance();
+ responseHandler.handleResponse(responses.apply(req));
+ }
+ finally {
+ synchronizer.awaitAdvance(synchronizer.arriveAndDeregister());
+ }
+ });
+ }
+ return new Result(req);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index c69a8fb48de..6a0f2d6afc2 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -11,7 +11,6 @@ import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.SubscriptionParameters;
import com.yahoo.documentapi.SubscriptionSession;
import com.yahoo.documentapi.SyncParameters;
-import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.VisitorDestinationParameters;
import com.yahoo.documentapi.VisitorDestinationSession;
import com.yahoo.documentapi.VisitorParameters;
@@ -19,32 +18,38 @@ import com.yahoo.documentapi.VisitorSession;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicReference;
/**
- * The main class of the local implementation of the document api
+ * The main class of the local implementation of the document api.
+ * To easily obtain an instance of this, with the documents using the schemas (.sd-files) in a given directoy,
+ * use the {@code com.yahoo.vespa.application} test module and {@code DocumentAccesses.ofSchemas(schemaDirectory)}
*
* @author bratseth
+ * @author jonmv
*/
public class LocalDocumentAccess extends DocumentAccess {
- Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final Map<DocumentId, Document> documents = new ConcurrentHashMap<>();
+ final AtomicReference<Phaser> phaser = new AtomicReference<>();
public LocalDocumentAccess(DocumentAccessParams params) {
super(params);
}
@Override
- public SyncSession createSyncSession(SyncParameters parameters) {
+ public LocalSyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public AsyncSession createAsyncSession(AsyncParameters parameters) {
+ public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
@@ -63,4 +68,42 @@ public class LocalDocumentAccess extends DocumentAccess {
throw new UnsupportedOperationException("Not supported yet");
}
+ /**
+ * Sets a {@link Phaser} for synchronization of otherwise async operations in sessions backed by this.
+ *
+ * {@link AsyncSession} and {@link VisitorSession} are by nature async. The {@link LocalAsyncSession} is, by default,
+ * synchronous, i.e., responses are sent by the thread that sends the document operations. {@link LocalVisitorSession},
+ * on the other hand, is asynchronous by default, i.e., all documents are sent by a dedicated sender thread.
+ * To enable more advanced testing using the {@link LocalDocumentAccess}, this method lets the user specify a
+ * {@link Phaser} used to synchronize the sending of documents from the visitor, and the responses for the
+ * document operations — which are then also done by a dedicated thread pool, instead of the caller thread.
+ *
+ * When this is set, a party is registered with the phaser for the sender thread (visit) or for each document
+ * operation (async-session). The thread that sends a document (visit) or response (async-session) then arrives
+ * and awaits advance before sending each response, so the user can trigger these documents and responses.
+ * After the document or response is delivered, the thread arrives and awaits advance, so the user
+ * can wait until the document or response has been delivered. This also ensures memory visibility.
+ * The visit sender thread deregisters when the whole visit is done; the async session threads after each operation.
+ * Example usage:
+ *
+ * <pre> {@code
+ * void testOperations(LocalDocumentAccess access) {
+ * List<Response> responses = new ArrayList<>();
+ * Phaser phaser = new Phaser(1); // "1" to register self
+ * access.setPhaser(phaser);
+ * AsyncSession session = access.createAsyncSession(new AsyncParameters().setReponseHandler(responses::add));
+ * session.put(documentPut);
+ * session.get(documentId);
+ * // Operations wait for this thread to arrive at "phaser"
+ * phaser.arriveAndAwaitAdvance(); // Let operations send their responses
+ * // "responses" may or may not hold the responses now
+ * phaser.arriveAndAwaitAdvance(); // Wait for operations to complete sending responses, memory visibility, etc.
+ * // "responses" now has responses from all previous operations
+ * phaser.arriveAndDeregister(); // Deregister so further operations flow freely
+ * }}</pre>
+ */
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index f087b646ca4..e0ae0278de8 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -23,6 +23,7 @@ import com.yahoo.yolean.Exceptions;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -42,6 +43,7 @@ public class LocalVisitorSession implements VisitorSession {
private final DocumentSelector selector;
private final FieldSet fieldSet;
private final AtomicReference<State> state;
+ private final AtomicReference<Phaser> phaser;
public LocalVisitorSession(LocalDocumentAccess access, VisitorParameters parameters) throws ParseException {
if (parameters.getResumeToken() != null)
@@ -64,11 +66,16 @@ public class LocalVisitorSession implements VisitorSession {
this.outstanding = new ConcurrentSkipListMap<>(Comparator.comparing(DocumentId::toString));
this.outstanding.putAll(access.documents);
this.state = new AtomicReference<>(State.RUNNING);
+ this.phaser = access.phaser;
start();
}
void start() {
+ Phaser synchronizer = phaser.get();
+ if (synchronizer != null)
+ synchronizer.register();
+
new Thread(() -> {
try {
// Iterate through all documents and pass on to data handler
@@ -76,14 +83,26 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ try {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ return;
+ }
+ catch (RuntimeException e) {
return;
+ }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
+
+ if (synchronizer != null)
+ synchronizer.arriveAndAwaitAdvance();
+
data.onMessage(new PutDocumentMessage(new DocumentPut(copy)),
new AckToken(id));
+
+ if (synchronizer != null)
+ synchronizer.arriveAndAwaitAdvance();
});
// Transition to a terminal state when done
state.updateAndGet(current -> {
@@ -107,6 +126,9 @@ public class LocalVisitorSession implements VisitorSession {
control.onDone(VisitorControlHandler.CompletionCode.FAILURE, Exceptions.toMessageString(e));
}
finally {
+ if (synchronizer != null)
+ synchronizer.arriveAndDeregister();
+
data.onDone();
}
}).start();
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
index 7a71089c180..0a4ab66aea4 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusAsyncSession.java
@@ -184,7 +184,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
Result send(Message msg, DocumentOperationParameters parameters) {
try {
long reqId = requestId.incrementAndGet();
- msg.setContext(reqId);
+ msg.setContext(new OperationContext(reqId, parameters.responseHandler().orElse(null)));
msg.getTrace().setLevel(parameters.traceLevel().orElse(traceLevel));
// Use route from parameters, or session route if non-default, or finally, defaults for get and non-get, if set. Phew!
String toRoute = parameters.route().orElse(mayOverrideWithGetOnlyRoute(msg) ? routeForGet : route);
@@ -198,6 +198,15 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
}
}
+ private static class OperationContext {
+ private final long reqId;
+ private final ResponseHandler responseHandler;
+ private OperationContext(long reqId, ResponseHandler responseHandler) {
+ this.reqId = reqId;
+ this.responseHandler = responseHandler;
+ }
+ }
+
/**
* A convenience method for assigning the internal trace level and route string to a message before sending it
* through the internal mbus session object.
@@ -206,7 +215,7 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
* @return the document api result object.
*/
public Result send(Message msg) {
- return send(msg, null);
+ return send(msg, parameters());
}
@Override
@@ -285,11 +294,6 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
new Error(mbusResult.getError().getMessage() + " (" + mbusResult.getError().getCode() + ")"));
}
- private static Response toResponse(Reply reply) {
- long reqId = (Long) reply.getContext();
- return reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId);
- }
-
private static Response toError(Reply reply, long reqId) {
boolean definitelyNotFound = reply instanceof UpdateDocumentReply && ! ((UpdateDocumentReply) reply).wasFound()
|| reply instanceof RemoveDocumentReply && ! ((RemoveDocumentReply) reply).wasFound();
@@ -346,8 +350,13 @@ public class MessageBusAsyncSession implements MessageBusSession, AsyncSession {
if (reply.getTrace().getLevel() > 0) {
log.log(Level.INFO, reply.getTrace().toString());
}
- Response response = toResponse(reply);
- if (handler != null) {
+ OperationContext context = (OperationContext) reply.getContext();
+ long reqId = context.reqId;
+ Response response = reply.hasErrors() ? toError(reply, reqId) : toSuccess(reply, reqId);
+ ResponseHandler operationSpecificResponseHandler = context.responseHandler;
+ if (operationSpecificResponseHandler != null)
+ operationSpecificResponseHandler.handleResponse(response);
+ else if (handler != null) {
handler.handleResponse(response);
} else {
queue.add(response);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 69dc7c6da74..b4e17038a35 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,6 +36,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,17 +91,22 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
+ LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- int timeout = 100;
+
+ // Let all async operations wait for a signal from the test thread before sending their responses, and let test
+ // thread wait for all responses to be delivered afterwards.
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser);
long startTime = System.currentTimeMillis();
+ int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -104,27 +115,38 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- List<Document> documents = new ArrayList<>();
- try {
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeout - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse)response).getDocument());
- outstandingRequests.remove(response.getRequestId());
+ // Wait for responses in separate thread.
+ Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ List<Document> documents = new ArrayList<>();
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeoutMillis - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse) response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
+ }
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for documents", e);
- }
+ catch (InterruptedException e) {
+ throw new IllegalArgumentException("Interrupted while waiting for responses");
+ }
+ });
+
+ // All operations, and receiver, now waiting for this thread to arrive.
+ assertEquals(4, phaser.getRegisteredParties());
+ assertEquals(0, phaser.getPhase());
+ phaser.arrive();
+ assertEquals(1, phaser.getPhase());
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
+ futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
}
@Test
diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
index 4a87217e08f..eebaf1f579f 100644
--- a/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
+++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/TestDriver.java
@@ -7,19 +7,33 @@ import com.google.inject.binder.AnnotatedBindingBuilder;
import com.yahoo.jdisc.Container;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.*;
+import com.yahoo.jdisc.application.Application;
+import com.yahoo.jdisc.application.ContainerActivator;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.jdisc.application.DeactivatedContainer;
+import com.yahoo.jdisc.application.OsgiFramework;
import com.yahoo.jdisc.core.ApplicationLoader;
import com.yahoo.jdisc.core.BootstrapLoader;
import com.yahoo.jdisc.core.FelixFramework;
import com.yahoo.jdisc.core.FelixParams;
-import com.yahoo.jdisc.handler.*;
+import com.yahoo.jdisc.handler.BindingNotFoundException;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDeniedException;
+import com.yahoo.jdisc.handler.RequestDispatch;
+import com.yahoo.jdisc.handler.RequestHandler;
+import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.service.CurrentContainer;
import java.net.URI;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -27,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* {@link BootstrapLoader} that provides convenient access to the {@link ContainerActivator} and {@link
* CurrentContainer} interfaces. A typical test case using this class looks as follows:</p>
* <pre>
- * {@literal @}Test
+ *{@literal @}Test
* public void requireThatMyComponentIsWellBehaved() {
* TestDriver driver = TestDriver.newSimpleApplicationInstanceWithoutOsgi();
* ContainerBuilder builder = driver.newContainerBuilder();
diff --git a/testutil/src/main/java/com/yahoo/test/ManualClock.java b/testutil/src/main/java/com/yahoo/test/ManualClock.java
index ffef6895c38..ba7d9698d72 100644
--- a/testutil/src/main/java/com/yahoo/test/ManualClock.java
+++ b/testutil/src/main/java/com/yahoo/test/ManualClock.java
@@ -9,6 +9,7 @@ import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAmount;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A clock which initially has the time of its creation but can only be advanced by calling advance
@@ -17,7 +18,7 @@ import java.time.temporal.TemporalAmount;
*/
public class ManualClock extends Clock {
- private Instant currentTime = Instant.now();
+ private AtomicReference<Instant> currentTime = new AtomicReference<>(Instant.now());
@Inject
public ManualClock() {}
@@ -27,19 +28,19 @@ public class ManualClock extends Clock {
}
public ManualClock(Instant currentTime) {
- this.currentTime = currentTime;
+ setInstant(currentTime);
}
public void advance(TemporalAmount temporal) {
- currentTime = currentTime.plus(temporal);
+ currentTime.updateAndGet(time -> time.plus(temporal));
}
public void setInstant(Instant time) {
- currentTime = time;
+ currentTime.set(time);
}
@Override
- public Instant instant() { return currentTime; }
+ public Instant instant() { return currentTime.get(); }
@Override
public ZoneId getZone() { return null; }
@@ -48,7 +49,7 @@ public class ManualClock extends Clock {
public Clock withZone(ZoneId zone) { return null; }
@Override
- public long millis() { return currentTime.toEpochMilli(); }
+ public long millis() { return instant().toEpochMilli(); }
public static Instant at(String utcIsoTime) {
return LocalDateTime.parse(utcIsoTime, DateTimeFormatter.ISO_DATE_TIME).atZone(ZoneOffset.UTC).toInstant();
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
index 57b28cecdca..f69bdc2a91d 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/communication/ApacheGatewayConnection.java
@@ -240,7 +240,7 @@ class ApacheGatewayConnection implements GatewayConnection {
httpPost.addHeader(headerName, headerValue);
});
- if (useCompression) {
+ if (useCompression) { // This causes the apache client to gzip the request content. Weird, huh?
httpPost.setHeader("Content-Encoding", "gzip");
}
return httpPost;
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index 9c4b81da806..8254c208588 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -72,6 +72,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
new file mode 100644
index 00000000000..8b0c966c46f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -0,0 +1,310 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.text.Text;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+/**
+ * Wraps the document API with an executor that can retry and time out document operations,
+ * as well as compute the required visitor parameters for visitor sessions.
+ *
+ * @author jonmv
+ */
+public interface DocumentOperationExecutor {
+
+ default void shutdown() { }
+
+ void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
+
+ void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context);
+
+ void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context);
+
+ void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
+
+ void visit(VisitorOptions options, VisitOperationsContext context);
+
+ String routeToCluster(String cluster);
+
+ enum ErrorType {
+ OVERLOAD,
+ NOT_FOUND,
+ PRECONDITION_FAILED,
+ BAD_REQUEST,
+ TIMEOUT,
+ ERROR;
+ }
+
+
+ /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
+ class Context<T> {
+
+ private final AtomicBoolean handled = new AtomicBoolean();
+ private final BiConsumer<ErrorType, String> onError;
+ private final Consumer<T> onSuccess;
+
+ Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
+ this.onError = onError;
+ this.onSuccess = onSuccess;
+ }
+
+ public void error(ErrorType type, String message) {
+ if ( ! handled.getAndSet(true))
+ onError.accept(type, message);
+ }
+
+ public void success(T result) {
+ if ( ! handled.getAndSet(true))
+ onSuccess.accept(result);
+ }
+
+ public boolean handled() {
+ return handled.get();
+ }
+
+ }
+
+
+ /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
+ class VisitOperationsContext extends Context<Optional<String>> {
+
+ private final Consumer<Document> onDocument;
+
+ public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
+ super(onError, onSuccess);
+ this.onDocument = onDocument;
+ }
+
+ public void document(Document document) {
+ if ( ! handled())
+ onDocument.accept(document);
+ }
+
+ }
+
+
+ /** Context for a document operation. */
+ class OperationContext extends Context<Optional<Document>> {
+
+ public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
+ super(onError, onSuccess);
+ }
+
+ }
+
+
+ class VisitorOptions {
+
+ final Optional<String> cluster;
+ final Optional<String> namespace;
+ final Optional<String> documentType;
+ final Optional<Group> group;
+ final Optional<String> selection;
+ final Optional<String> fieldSet;
+ final Optional<String> continuation;
+ final Optional<String> bucketSpace;
+ final Optional<Integer> wantedDocumentCount;
+ final Optional<Integer> concurrency;
+
+ private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
+ Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
+ Optional<String> continuation, Optional<String> bucketSpace,
+ Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
+ this.cluster = cluster;
+ this.namespace = namespace;
+ this.documentType = documentType;
+ this.group = group;
+ this.selection = selection;
+ this.fieldSet = fieldSet;
+ this.continuation = continuation;
+ this.bucketSpace = bucketSpace;
+ this.wantedDocumentCount = wantedDocumentCount;
+ this.concurrency = concurrency;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VisitorOptions that = (VisitorOptions) o;
+ return cluster.equals(that.cluster) &&
+ namespace.equals(that.namespace) &&
+ documentType.equals(that.documentType) &&
+ group.equals(that.group) &&
+ selection.equals(that.selection) &&
+ fieldSet.equals(that.fieldSet) &&
+ continuation.equals(that.continuation) &&
+ bucketSpace.equals(that.bucketSpace) &&
+ wantedDocumentCount.equals(that.wantedDocumentCount) &&
+ concurrency.equals(that.concurrency);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency);
+ }
+
+ @Override
+ public String toString() {
+ return "VisitorOptions{" +
+ "cluster=" + cluster +
+ ", namespace=" + namespace +
+ ", documentType=" + documentType +
+ ", group=" + group +
+ ", selection=" + selection +
+ ", fieldSet=" + fieldSet +
+ ", continuation=" + continuation +
+ ", bucketSpace=" + bucketSpace +
+ ", wantedDocumentCount=" + wantedDocumentCount +
+ ", concurrency=" + concurrency +
+ '}';
+ }
+
+ public static Builder builder() { return new Builder(); }
+
+
+ public static class Builder {
+
+ private String cluster;
+ private String documentType;
+ private String namespace;
+ private Group group;
+ private String selection;
+ private String fieldSet;
+ private String continuation;
+ private String bucketSpace;
+ private Integer wantedDocumentCount;
+ private Integer concurrency;
+
+ public Builder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder documentType(String documentType) {
+ this.documentType = documentType;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder group(Group group) {
+ this.group = group;
+ return this;
+ }
+
+ public Builder selection(String selection) {
+ this.selection = selection;
+ return this;
+ }
+
+ public Builder fieldSet(String fieldSet) {
+ this.fieldSet = fieldSet;
+ return this;
+ }
+
+ public Builder continuation(String continuation) {
+ this.continuation = continuation;
+ return this;
+ }
+
+ public Builder bucketSpace(String bucketSpace) {
+ this.bucketSpace = bucketSpace;
+ return this;
+ }
+
+ public Builder wantedDocumentCount(Integer wantedDocumentCount) {
+ this.wantedDocumentCount = wantedDocumentCount;
+ return this;
+ }
+
+ public Builder concurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ return this;
+ }
+
+ public VisitorOptions build() {
+ return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
+ Optional.ofNullable(namespace), Optional.ofNullable(group),
+ Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
+ Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
+ Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
+ }
+
+ }
+
+ }
+
+
+ class Group {
+
+ private final String value;
+ private final String docIdPart;
+ private final String selection;
+
+ private Group(String value, String docIdPart, String selection) {
+ Text.validateTextString(value)
+ .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
+ this.value = value;
+ this.docIdPart = docIdPart;
+ this.selection = selection;
+ }
+
+ public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
+
+ public String value() { return value; }
+ public String docIdPart() { return docIdPart; }
+ public String selection() { return selection; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Group group = (Group) o;
+ return value.equals(group.value) &&
+ docIdPart.equals(group.docIdPart) &&
+ selection.equals(group.selection);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, docIdPart, selection);
+ }
+
+ @Override
+ public String toString() {
+ return "Group{" +
+ "value='" + value + '\'' +
+ ", docIdPart='" + docIdPart + '\'' +
+ ", selection='" + selection + '\'' +
+ '}';
+ }
+
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
new file mode 100644
index 00000000000..135b6a824c8
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
@@ -0,0 +1,496 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.ResponseHandler;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.SEVERE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Encapsulates a document access and supports running asynchronous document
+ * operations and visits against this, with retries and optional timeouts.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutorImpl implements DocumentOperationExecutor {
+
+ private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName());
+
+ private final Duration visitTimeout;
+ private final long maxThrottled;
+ private final DocumentAccess access;
+ private final AsyncSession asyncSession;
+ private final Map<String, StorageCluster> clusters;
+ private final Clock clock;
+ private final DelayQueue throttled;
+ private final DelayQueue timeouts;
+ private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+
+ public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
+ DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
+ this(Duration.ofMillis(executorConfig.resendDelayMillis()),
+ Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
+ Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
+ executorConfig.maxThrottled(),
+ access,
+ parseClusters(clustersConfig, bucketsConfig),
+ clock);
+ }
+
+ DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
+ DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
+ this.visitTimeout = requireNonNull(visitTimeout);
+ this.maxThrottled = maxThrottled;
+ this.access = requireNonNull(access);
+ this.asyncSession = access.createAsyncSession(new AsyncParameters());
+ this.clock = requireNonNull(clock);
+ this.clusters = Map.copyOf(clusters);
+ this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock, "throttle");
+ this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> {
+ context.error(TIMEOUT, "Timed out after " + defaultTimeout);
+ return true;
+ }, defaultTimeout, clock, "timeout");
+ }
+
+ private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
+ if (options.cluster.isEmpty() && options.documentType.isEmpty())
+ throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+
+ VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection,
+ options.documentType,
+ options.namespace.map(value -> "id.namespace=='" + value + "'"),
+ options.group.map(Group::selection))
+ .flatMap(Optional::stream)
+ .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
+ StringJoiner::add,
+ StringJoiner::merge)
+ .toString());
+
+ options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
+ parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10));
+ options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1)));
+ parameters.setTimeoutMs(visitTimeout.toMillis());
+ parameters.visitInconsistentBuckets(true);
+ parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
+
+ StorageCluster storageCluster = resolveCluster(options.cluster, clusters);
+ parameters.setRoute(storageCluster.route());
+ parameters.setBucketSpace(resolveBucket(storageCluster,
+ options.documentType,
+ List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
+ options.bucketSpace));
+
+ return parameters;
+ }
+
+ /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
+ @Override
+ public void shutdown() {
+ long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
+ visits.values().forEach(VisitorSession::destroy);
+ Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
+ context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
+ Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
+ context -> context.error(TIMEOUT, "Timed out due to shutdown"));
+ try {
+ throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throttleShutdown.cancel(true);
+ throttleShutdown.cancel(true);
+ log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(parameters, context))), context);
+ }
+
+ @Override
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(parameters, context))), context);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(parameters, context))), context);
+ }
+
+ @Override
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(parameters, context))), context);
+ }
+
+ @Override
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ try {
+ AtomicBoolean done = new AtomicBoolean(false);
+ VisitorParameters parameters = asParameters(options, clusters, visitTimeout);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
+ @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
+ });
+ parameters.setControlHandler(new VisitorControlHandler() {
+ @Override public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets())
+ context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
+ case SUCCESS: // intentional fallthrough
+ case ABORTED:
+ context.success(Optional.ofNullable(getProgress())
+ .filter(progress -> ! progress.isFinished())
+ .map(ProgressToken::serializeToString));
+ break;
+ default:
+ context.error(ERROR, message != null ? message : "Visiting failed");
+ }
+ done.set(true); // This may be reached before dispatching thread is done putting us in the map.
+ visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
+ }
+ });
+ visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ if (done.get())
+ visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
+ }
+ catch (IllegalArgumentException | ParseException e) {
+ context.error(BAD_REQUEST, Exceptions.toMessageString(e));
+ }
+ catch (RuntimeException e) {
+ context.error(ERROR, Exceptions.toMessageString(e));
+ }
+ }
+
+ @Override
+ public String routeToCluster(String cluster) {
+ return resolveCluster(Optional.of(cluster), clusters).route();
+ }
+
+ private ResponseHandler handlerOf(DocumentOperationParameters parameters, OperationContext context) {
+ return response -> {
+ parameters.responseHandler().ifPresent(originalHandler -> originalHandler.handleResponse(response));
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ };
+ }
+
+ /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
+ private void accept(Supplier<Result> operation, OperationContext context) {
+ timeouts.add(operation, context);
+ if (throttled.size() > 0 || ! send(operation, context))
+ if ( ! throttled.add(operation, context))
+ context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
+ }
+
+ /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */
+ private boolean send(Supplier<Result> operation, OperationContext context) {
+ Result result = operation.get();
+ switch (result.type()) {
+ case SUCCESS:
+ return true;
+ case TRANSIENT_ERROR:
+ return false;
+ default:
+ log.log(WARNING, "Unknown result type '" + result.type() + "'");
+ case FATAL_ERROR: // intentional fallthrough
+ context.error(ERROR, result.getError().getMessage());
+ return true; // Request handled, don't retry.
+ }
+ }
+
+ private static ErrorType toErrorType(Response.Outcome outcome) {
+ switch (outcome) {
+ case NOT_FOUND:
+ return NOT_FOUND;
+ case CONDITION_FAILED:
+ return PRECONDITION_FAILED;
+ default:
+ log.log(WARNING, "Unexpected response outcome: " + outcome);
+ case ERROR: // intentional fallthrough
+ return ERROR;
+ }
+ }
+
+
+ /**
+ * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
+ *
+ * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
+ * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
+ */
+ static class DelayQueue {
+
+ private final long maxSize;
+ private final Clock clock;
+ private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong size = new AtomicLong(0);
+ private final Thread maintainer;
+ private final Duration delay;
+ private final long defaultWaitMillis;
+
+ public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action,
+ Duration delay, Clock clock, String threadName) {
+ if (maxSize < 0)
+ throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
+ if (delay.isNegative())
+ throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
+
+ this.maxSize = maxSize;
+ this.delay = delay;
+ this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
+ this.clock = requireNonNull(clock);
+ this.maintainer = new DaemonThreadFactory("document-operation-executor-" + threadName).newThread(() -> maintain(action));
+ this.maintainer.start();
+ }
+
+ boolean add(Supplier<Result> operation, OperationContext context) {
+ if (size.incrementAndGet() > maxSize) {
+ size.decrementAndGet();
+ return false;
+ }
+ return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
+ }
+
+ long size() { return size.get(); }
+
+ Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
+ ExecutorService shutdownService = Executors.newSingleThreadExecutor();
+ Future<?> future = shutdownService.submit(() -> {
+ try {
+ long doomMillis = clock.millis() + grace.toMillis();
+ while (size.get() > 0 && clock.millis() < doomMillis)
+ Thread.sleep(100);
+ }
+ finally {
+ maintainer.interrupt();
+ for (Delayed delayed; (delayed = queue.poll()) != null; ) {
+ size.decrementAndGet();
+ onShutdown.accept(delayed.context());
+ }
+ }
+ return null;
+ });
+ shutdownService.shutdown();
+ return future;
+ }
+
+ /**
+ * Repeatedly loops through the queue, evicting already handled entries and processing those
+ * which have become ready since last time, then waits until new items are guaranteed to be ready,
+ * or until it's time for a new run just to ensure GC of handled entries.
+ * The entries are assumed to always be added to the back of the queue, with the same delay.
+ * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
+ * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
+ */
+ private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) {
+ while ( ! Thread.currentThread().isInterrupted()) {
+ try {
+ Instant waitUntil = null;
+ Iterator<Delayed> operations = queue.iterator();
+ boolean rejected = false;
+ while (operations.hasNext()) {
+ Delayed delayed = operations.next();
+ // Already handled: remove and continue.
+ if (delayed.context().handled()) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ // Ready for action: remove from queue and run unless an operation was already rejected.
+ if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) {
+ if (action.test(delayed.operation(), delayed.context())) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ else { // If an operation is rejected, handle no more this run, and wait a short while before retrying.
+ waitUntil = clock.instant().plus(Duration.ofMillis(10));
+ rejected = true;
+ }
+ }
+ // Not yet ready for action: keep time to wake up again.
+ waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
+ }
+ long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
+ synchronized (this) {
+ do {
+ notify();
+ wait(Math.max(0, waitUntilMillis - clock.millis()));
+ }
+ while (clock.millis() < waitUntilMillis);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e) {
+ log.log(SEVERE, "Exception caught by delay queue maintainer", e);
+ }
+ }
+ }
+ }
+
+
+ private static class Delayed {
+
+ private final Supplier<Result> operation;
+ private final OperationContext context;
+ private final Instant readyAt;
+
+ Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
+ this.readyAt = requireNonNull(readyAt);
+ this.context = requireNonNull(context);
+ this.operation = requireNonNull(operation);
+ }
+
+ Supplier<Result> operation() { return operation; }
+ OperationContext context() { return context; }
+ Instant readyAt() { return readyAt; }
+
+ }
+
+
+ static class StorageCluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<String, String> documentBuckets;
+
+ StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() { return name; }
+ String configId() { return configId; }
+ String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
+ Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
+
+ }
+
+
+ static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ if (clusters.isEmpty())
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
+
+ return wanted.map(cluster -> {
+ if ( ! clusters.containsKey(cluster))
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.get(cluster);
+ }).orElseGet(() -> {
+ if (clusters.size() > 1)
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.values().iterator().next();
+ });
+ }
+
+ private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
+ return documentType.map(type -> cluster.bucketOf(type)
+ .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
+ "' is not mapped to a known bucket space")))
+ .or(() -> bucketSpace.map(space -> {
+ if ( ! bucketSpaces.contains(space))
+ throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
+ String.join(", ", bucketSpaces));
+ return space;
+ }))
+ .orElse(FixedBucketSpaces.defaultSpace());
+ }
+
+
+
+ private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
+ return clusters.storage().stream()
+ .collect(toUnmodifiableMap(storage -> storage.name(),
+ storage -> new StorageCluster(storage.name(),
+ storage.configid(),
+ buckets.cluster(storage.name())
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().bucketSpace())))));
+ }
+
+
+ // Visible for testing.
+ AsyncSession asyncSession() { return asyncSession; }
+ Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
+ void notifyMaintainers() throws InterruptedException {
+ synchronized (throttled) { throttled.notify(); throttled.wait(); }
+ synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
new file mode 100644
index 00000000000..809e3522652
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -0,0 +1,620 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi.resource;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.json.JsonReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.json.document.DocumentParser;
+import com.yahoo.document.restapi.DocumentOperationExecutor;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ReadableContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.handler.UnsafeContentInputStream;
+import com.yahoo.container.core.HandlerMetricContextUtil;
+import com.yahoo.jdisc.http.HttpRequest;
+import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.restapi.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
+import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
+import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
+import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Asynchronous HTTP handler for /document/v1/
+ *
+ * @author jonmv
+ */
+public class DocumentV1ApiHandler extends AbstractRequestHandler {
+
+ private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
+ private static final Parser<Integer> numberParser = Integer::parseInt;
+ private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
+
+ private static final CompletionHandler logException = new CompletionHandler() {
+ @Override public void completed() { }
+ @Override public void failed(Throwable t) {
+ log.log(FINE, () -> "Exception writing or closing response data: " + Exceptions.toMessageString(t));
+ }
+ };
+
+ private static final ContentChannel ignoredContent = new ContentChannel() {
+ @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
+ @Override public void close(CompletionHandler handler) { handler.completed(); }
+ };
+
+ private static final String CREATE = "create";
+ private static final String CONDITION = "condition";
+ private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
+ private static final String FIELD_SET = "fieldSet";
+ private static final String SELECTION = "selection";
+ private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
+ private static final String CONTINUATION = "continuation";
+ private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
+ private static final String CONCURRENCY = "concurrency";
+ private static final String BUCKET_SPACE = "bucketSpace";
+
+ private final Clock clock;
+ private final Metric metric; // TODO jonmv: make response class which logs on completion/error
+ private final DocumentApiMetrics metrics;
+ private final DocumentOperationExecutor executor;
+ private final DocumentOperationParser parser;
+ private final Map<String, Map<Method, Handler>> handlers;
+
+ @Inject
+ public DocumentV1ApiHandler(Clock clock,
+ Metric metric,
+ MetricReceiver metricReceiver,
+ VespaDocumentAccess documentAccess,
+ DocumentmanagerConfig documentManagerConfig,
+ ClusterListConfig clusterListConfig,
+ AllClustersBucketSpacesConfig bucketSpacesConfig,
+ DocumentOperationExecutorConfig executorConfig) {
+ this(clock,
+ new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
+ new DocumentOperationParser(documentManagerConfig),
+ metric,
+ metricReceiver);
+ }
+
+ DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
+ Metric metric, MetricReceiver metricReceiver) {
+ this.clock = clock;
+ this.executor = executor;
+ this.parser = parser;
+ this.metric = metric;
+ this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.handlers = defineApi();
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
+ HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
+ ResponseHandler responseHandler = response -> {
+ HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
+ return rawResponseHandler.handleResponse(response);
+ };
+
+ HttpRequest request = (HttpRequest) rawRequest;
+ try {
+ Path requestPath = new Path(request.getUri());
+ for (String path : handlers.keySet())
+ if (requestPath.matches(path)) {
+ Map<Method, Handler> methods = handlers.get(path);
+ if (methods.containsKey(request.getMethod()))
+ return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
+
+ if (request.getMethod() == OPTIONS)
+ return options(methods.keySet(), responseHandler);
+
+ return methodNotAllowed(request, methods.keySet(), responseHandler);
+ }
+ return notFound(request, handlers.keySet(), responseHandler);
+ }
+ catch (IllegalArgumentException e) {
+ return badRequest(request, e, responseHandler);
+ }
+ catch (RuntimeException e) {
+ return serverError(request, e, responseHandler);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ this.executor.shutdown();
+ }
+
+ private Map<String, Map<Method, Handler>> defineApi() {
+ Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
+
+ handlers.put("/document/v1/",
+ Map.of(GET, this::getRoot));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ return Collections.unmodifiableMap(handlers);
+ }
+
+ private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ VisitorOptions.Builder options = parseOptions(request, path);
+ options = options.documentType(path.documentType());
+ options = options.namespace(path.namespace());
+ options = path.group().map(options::group).orElse(options);
+ executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
+ Object monitor = new Object();
+ return new VisitOperationsContext((type, message) -> {
+ synchronized (monitor) {
+ handleError(request, type, message, root, handler);
+ }
+ },
+ token -> {
+ token.ifPresent(value -> root.setString("continuation", value));
+ synchronized (monitor) {
+ respond(root, handler);
+ }
+ },
+ // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
+ document -> {
+ try {
+ synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
+ SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
+ documents.addObject());
+ }
+ }
+ // TODO jonmv: This shouldn't happen much, but ... expose errors too?
+ catch (RuntimeException e) {
+ log.log(WARNING, "Exception serializing document in document/v1 visit response", e);
+ }
+ });
+ }
+
+ private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ DocumentId id = path.id();
+ DocumentOperationParameters parameters = parameters();
+ parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
+ parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
+ executor.get(id,
+ parameters,
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ document -> {
+ try {
+ Cursor root = responseRoot(request, id);
+ document.map(JsonWriter::toByteArray)
+ .map(SlimeUtils::jsonToSlime)
+ .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields")));
+ respond(document.isPresent() ? 200 : 404,
+ root,
+ handler);
+ }
+ catch (Exception e) {
+ serverError(request, new RuntimeException(e), handler);
+ }
+ }));
+ return ignoredContent;
+ }
+
+ private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentPut put = parser.parsePut(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
+ executor.put(put,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, handler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, handler);
+ }
+ });
+ }
+
+ private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentUpdate update = parser.parseUpdate(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
+ getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
+ executor.update(update,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, handler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, handler);
+ }
+ });
+ }
+
+ private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
+ executor.remove(id,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ return ignoredContent;
+ }
+
+ private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
+ switch (type) {
+ case BAD_REQUEST:
+ badRequest(request, message, root, handler);
+ break;
+ case NOT_FOUND:
+ notFound(request, message, root, handler);
+ break;
+ case PRECONDITION_FAILED:
+ preconditionFailed(request, message, root, handler);
+ break;
+ case OVERLOAD:
+ overload(request, message, root, handler);
+ break;
+ case TIMEOUT:
+ timeout(request, message, root, handler);
+ break;
+ default:
+ log.log(WARNING, "Unexpected error type '" + type + "'");
+ case ERROR: // intentional fallthrough
+ serverError(request, message, root, handler);
+ }
+ }
+
+ // ------------------------------------------------ Responses ------------------------------------------------
+
+ private static Cursor responseRoot(HttpRequest request) {
+ Cursor root = new Slime().setObject();
+ root.setString("pathId", request.getUri().getRawPath());
+ return root;
+ }
+
+ private static Cursor responseRoot(HttpRequest request, DocumentId id) {
+ Cursor root = responseRoot(request);
+ root.setString("id", id.toString());
+ return root;
+ }
+
+ private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
+ Response response = new Response(Response.Status.NO_CONTENT);
+ response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
+ handler.handleResponse(response).close(logException);
+ return ignoredContent;
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
+ return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.BAD_REQUEST, root, handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
+ return notFound(request,
+ "Nothing at '" + request.getUri().getRawPath() + "'. " +
+ "Available paths are:\n" + String.join("\n", paths),
+ responseRoot(request),
+ handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.NOT_FOUND, root, handler);
+ }
+
+ private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ root.setString("message",
+ "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
+ "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
+ return respond(Response.Status.METHOD_NOT_ALLOWED,
+ root,
+ handler);
+ }
+
+ private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.PRECONDITION_FAILED, root, handler);
+ }
+
+ private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
+ Cursor root = responseRoot(request);
+ root.setString("message", Exceptions.toMessageString(e));
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
+ }
+
+ private static ContentChannel respond(Inspector root, ResponseHandler handler) {
+ return respond(200, root, handler);
+ }
+
+ private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
+ Response response = new Response(status);
+ response.headers().put("Content-Type", "application/json; charset=UTF-8");
+ ContentChannel out = null;
+ try {
+ out = handler.handleResponse(response);
+ out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ finally {
+ if (out != null) try {
+ out.close(logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ }
+ return ignoredContent;
+ }
+
+ // ------------------------------------------------ Helpers ------------------------------------------------
+
+ private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
+ VisitorOptions.Builder options = VisitorOptions.builder();
+
+ getProperty(request, SELECTION).ifPresent(options::selection);
+ getProperty(request, CONTINUATION).ifPresent(options::continuation);
+ getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
+ getProperty(request, CLUSTER).ifPresent(options::cluster);
+ getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
+ getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
+ .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
+ getProperty(request, CONCURRENCY, numberParser)
+ .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+
+ return options;
+ }
+
+ static class DocumentPath {
+
+ private final Path path;
+ private final Optional<Group> group;
+
+ DocumentPath(Path path) {
+ this.path = requireNonNull(path);
+ this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
+ .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ }
+
+ DocumentId id() {
+ return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
+ ":" + requireNonNull(path.get("documentType")) +
+ ":" + group.map(Group::docIdPart).orElse("") +
+ ":" + requireNonNull(path.get("docid")));
+ }
+
+ String documentType() { return requireNonNull(path.get("documentType")); }
+ String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<Group> group() { return group; }
+
+ }
+
+ private static Optional<String> getProperty(HttpRequest request, String name) {
+ List<String> values = request.parameters().get(name);
+ if (values != null && values.size() != 0)
+ return Optional.ofNullable(values.get(values.size() - 1));
+
+ return Optional.empty();
+ }
+
+ private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
+ return getProperty(request, name).map(parser::parse);
+ }
+
+
+ @FunctionalInterface
+ interface Parser<T> extends Function<String, T> {
+ default T parse(String value) {
+ try {
+ return apply(value);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
+
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ }
+
+
+ /** Readable content channel which forwards data to a reader when closed. */
+ static class ForwardingContentChannel implements ContentChannel {
+
+ private final ReadableContentChannel delegate = new ReadableContentChannel();
+ private final Consumer<InputStream> reader;
+
+ public ForwardingContentChannel(Consumer<InputStream> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ delegate.write(buf, handler);
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ delegate.close(handler);
+ try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
+ reader.accept(in);
+ }
+ }
+
+ }
+
+
+ static class DocumentOperationParser {
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private final DocumentTypeManager manager;
+
+ DocumentOperationParser(DocumentmanagerConfig config) {
+ this.manager = new DocumentTypeManager(config);
+ }
+
+ DocumentPut parsePut(InputStream inputStream, String docId) {
+ return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
+ }
+
+ DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
+ return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
+ }
+
+ private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
+ return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
+ }
+
+ }
+
+ private class MeasuringResponseHandler implements ResponseHandler {
+
+ private final ResponseHandler delegate;
+ private final DocumentOperationType type;
+ private final Instant start;
+
+ private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
+ this.delegate = delegate;
+ this.type = type;
+ this.start = start;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ switch (response.getStatus() / 100) {
+ case 2: metrics.reportSuccessful(type, start); break;
+ case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
+ case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
+ }
+ return delegate.handleResponse(response);
+ }
+
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
new file mode 100644
index 00000000000..19f4f50648b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
@@ -0,0 +1,15 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package=com.yahoo.document.restapi
+
+# Delay before a throttled operation is retried.
+resendDelayMillis int default=100
+
+# Time between a document operation is received and a timeout response is sent
+defaultTimeoutSeconds int default=180
+
+# Time after which a visitor session times out
+visitTimeoutSeconds int default=120
+
+# Bound on number of document operations to keep in retry queue — further operations are rejected
+maxThrottled int default=200
+
diff --git a/vespaclient-container-plugin/src/test/cfg/music.sd b/vespaclient-container-plugin/src/test/cfg/music.sd
new file mode 100644
index 00000000000..a289f5a686b
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/cfg/music.sd
@@ -0,0 +1,6 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+search music {
+ document music {
+ field artist type string { }
+ }
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
new file mode 100644
index 00000000000..3d350adab87
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
@@ -0,0 +1,85 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.document.DocumentGet;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.documentapi.DocumentOperationParameters;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jonmv
+ */
+public class DocumentOperationExecutorMock implements DocumentOperationExecutor {
+
+ final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>();
+ final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>();
+ final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>();
+ final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>();
+ final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>();
+
+ @Override
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(new DocumentGet(id), parameters, context);
+ }
+
+ @Override
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(put, parameters, context);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(update, parameters, context);
+ }
+
+ @Override
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(new DocumentRemove(id), parameters, context);
+ }
+
+ @Override
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ lastOptions.set(options);
+ lastVisitContext.set(context);
+ }
+
+ @Override
+ public String routeToCluster(String cluster) {
+ if ("throw-me".equals(cluster))
+ throw new IllegalArgumentException(cluster);
+
+ return "route-to-" + cluster;
+ }
+
+ public DocumentOperation lastOperation() {
+ return lastOperation.get();
+ }
+
+ public DocumentOperationParameters lastParameters() {
+ return lastParameters.get();
+ }
+
+ public OperationContext lastOperationContext() {
+ return lastOperationContext.get();
+ }
+
+ public VisitorOptions lastOptions() {
+ return lastOptions.get();
+ }
+
+ public VisitOperationsContext lastVisitContext() {
+ return lastVisitContext.get();
+ }
+
+ private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) {
+ lastOperation.set(operation);
+ lastParameters.set(parameters);
+ lastOperationContext.set(context);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
new file mode 100644
index 00000000000..1d2f6af35dd
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
@@ -0,0 +1,406 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.application.container.DocumentAccesses;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.local.LocalAsyncSession;
+import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * This test uses a config definition for the "music" document type, which has a single string field "artist".
+ * One cluster named "content" exists, and can be reached through the "route" route for "music" documents.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutorTest {
+
+ final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder()
+ .cluster("content",
+ new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music",
+ new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace(FixedBucketSpaces.defaultSpace())))
+ .build();
+ final ClusterListConfig clusterConfig = new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder().configid("config-id")
+ .name("content"))
+ .build();
+ final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
+ .resendDelayMillis(10)
+ .defaultTimeoutSeconds(1)
+ .maxThrottled(2)
+ .build();
+ final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content",
+ "config-id",
+ Map.of("music", "route")));
+ final List<Document> received = new ArrayList<>();
+ final List<ErrorType> errors = new ArrayList<>();
+ final List<String> messages = new ArrayList<>();
+ final List<String> tokens = new ArrayList<>();
+ ManualClock clock;
+ LocalDocumentAccess access;
+ DocumentOperationExecutorImpl executor;
+ DocumentType musicType;
+ Document doc1;
+ Document doc2;
+ Document doc3;
+
+ OperationContext operationContext() {
+ return new OperationContext((type, error) -> { errors.add(type); messages.add(error); },
+ document -> document.ifPresent(received::add));
+ }
+
+ VisitOperationsContext visitContext() {
+ return new VisitOperationsContext((type, error) -> { errors.add(type); messages.add(error); },
+ token -> token.ifPresent(tokens::add),
+ received::add);
+ }
+
+ LocalAsyncSession session() {
+ return (LocalAsyncSession) executor.asyncSession();
+ }
+
+ @Before
+ public void setUp() {
+ clock = new ManualClock();
+ access = DocumentAccesses.createFromSchemas("src/test/cfg");
+ executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock);
+ received.clear();
+ errors.clear();
+ tokens.clear();
+
+ musicType = access.getDocumentTypeManager().getDocumentType("music");
+ doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one");
+ doc2 = new Document(musicType, "id:ns:music:n=1:2"); doc2.setFieldValue("artist", "two");
+ doc3 = new Document(musicType, "id:ns:music:g=a:3");
+ }
+
+ @After
+ public void tearDown() {
+ access.shutdown();
+ }
+
+ @Test
+ public void testResolveCluster() {
+ assertEquals("[Storage:cluster=content;clusterconfigid=config-id]",
+ executor.routeToCluster("content"));
+ try {
+ executor.routeToCluster("blargh");
+ fail("Should not find this cluster");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage());
+ }
+ assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name());
+ try {
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of());
+ fail("No clusters should fail");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage());
+ }
+ try {
+ Map<String, StorageCluster> twoClusters = new TreeMap<>();
+ twoClusters.put("one", new StorageCluster("one", "one-config", Map.of()));
+ twoClusters.put("two", new StorageCluster("two", "two-config", Map.of()));
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), twoClusters);
+ fail("More than one cluster and no document type should fail");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testThrottling() throws InterruptedException {
+ executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
+ // Put documents 1 and 2 into backend.
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ assertEquals(List.of(doc1, doc2), received);
+
+ session().setResultType(Result.ResultType.TRANSIENT_ERROR);
+
+ // First two are put on retry queue.
+ executor.get(doc1.getId(), parameters(), operationContext());
+ executor.get(doc2.getId(), parameters(), operationContext());
+ assertEquals(List.of(), errors);
+
+ // Third operation is rejected.
+ executor.get(doc3.getId(), parameters(), operationContext());
+ assertEquals(List.of(OVERLOAD), errors);
+
+ // Maintainer does not yet run.
+ executor.notifyMaintainers();
+ // Third operation is rejected again.
+ executor.get(doc3.getId(), parameters(), operationContext());
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+
+ // Maintainer retries documents, but they're put back into the queue with a new delay.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers();
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+
+ session().setResultType(Result.ResultType.SUCCESS);
+ // Maintainer retries documents again, this time successfully.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers();
+ assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
+ assertEquals(List.of(doc1, doc2, doc1, doc2), received);
+ }
+
+ @Test
+ public void testTimeout() throws InterruptedException {
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser);
+ executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
+
+ // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ clock.advance(Duration.ofMillis(20));
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ executor.notifyMaintainers();
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(), received);
+
+ clock.advance(Duration.ofMillis(990));
+ executor.notifyMaintainers(); // Let doc1 time out.
+ phaser.arriveAndAwaitAdvance(); // Let doc2 arrive.
+ phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered.
+ assertEquals(List.of(TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+
+ session().setResultType(Result.ResultType.TRANSIENT_ERROR);
+ executor.put(new DocumentPut(doc3), parameters(), operationContext());
+ clock.advance(Duration.ofMillis(990));
+ executor.notifyMaintainers(); // Retry throttled operation.
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers(); // Time out throttled operation.
+ assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+
+ session().setResultType(Result.ResultType.SUCCESS);
+ clock.advance(Duration.ofMillis(20));
+ executor.notifyMaintainers(); // Retry not attempted since operation already timed out.
+ phaser.arriveAndAwaitAdvance();
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
+ assertEquals(List.of(doc2), received);
+ }
+
+ @Test
+ public void testCallback() {
+ AtomicBoolean called = new AtomicBoolean();
+ executor.get(doc1.getId(), parameters().withResponseHandler(__ -> called.set(true)), operationContext());
+ assertTrue(called.get());
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(), received);
+ }
+
+ @Test
+ public void testVisit() throws InterruptedException {
+ executor.put(new DocumentPut(doc1), parameters(), operationContext());
+ executor.put(new DocumentPut(doc2), parameters(), operationContext());
+ executor.put(new DocumentPut(doc3), parameters(), operationContext());
+ assertEquals(doc1, received.remove(0));
+ assertEquals(doc2, received.remove(0));
+ assertEquals(doc3, received.remove(0));
+
+ // No cluster or document type set.
+ executor.visit(VisitorOptions.builder()
+ .build(),
+ visitContext());
+ assertEquals("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level", messages.remove(0));
+ assertEquals(BAD_REQUEST, errors.remove(0));
+ assertEquals(List.of(), received);
+
+ // Cluster not found.
+ executor.visit(VisitorOptions.builder()
+ .cluster("blargh")
+ .build(),
+ visitContext());
+ assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", messages.remove(0));
+ assertEquals(BAD_REQUEST, errors.remove(0));
+ assertEquals(List.of(), received);
+
+ // Matches doc2 for user 1.
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .group(Group.of(1))
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions()) {
+ session.waitUntilDone();
+ }
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(doc2, received.remove(0));
+
+ // Matches documents in namespace ns of type music in group a.
+ executor.visit(VisitorOptions.builder()
+ .concurrency(2)
+ .wantedDocumentCount(3)
+ .namespace("ns")
+ .documentType("music")
+ .fieldSet("music:artist")
+ .group(Group.of("a"))
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions())
+ session.waitUntilDone();
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(doc3, received.remove(0));
+
+ // Matches documents with non-empty artist field.
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .selection("music.artist")
+ .fieldSet("[id]")
+ .build(),
+ visitContext());
+ for (VisitorControlHandler session : executor.visitorSessions())
+ session.waitUntilDone();
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(doc1.getId(), doc2.getId()), List.of(received.remove(0).getId(), received.remove(0).getId()));
+
+ // Matches all documents, but we'll shut down midway.
+ Phaser phaser = new Phaser(1);
+ access.setPhaser(phaser);
+ executor.visit(VisitorOptions.builder()
+ .cluster("content")
+ .bucketSpace("global")
+ .build(),
+ visitContext());
+ phaser.arriveAndAwaitAdvance(); // First document pending
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread shutdownThread = new Thread(() -> {
+ executor.shutdown();
+ latch.countDown();
+ });
+ shutdownThread.start();
+ clock.advance(Duration.ofMillis(100));
+ executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly.
+ latch.await(); // Make sure visit session is shut down before next document is considered.
+ phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above.
+ for (VisitorControlHandler session : executor.visitorSessions()) {
+ session.waitUntilDone();
+ }
+ assertEquals(List.of(), messages);
+ assertEquals(List.of(), errors);
+ assertEquals(List.of(doc1), received);
+ }
+
+ @Test
+ public void testDelayQueue() throws ExecutionException, InterruptedException, TimeoutException {
+ Supplier<Result> nullOperation = () -> null;
+ AtomicLong counter1 = new AtomicLong(0);
+ AtomicLong counter2 = new AtomicLong(0);
+ AtomicLong counter3 = new AtomicLong(0);
+ AtomicBoolean throttle = new AtomicBoolean(true);
+ OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet());
+ OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet());
+ OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet());
+ DelayQueue queue = new DelayQueue(3,
+ (operation, context) -> {
+ if (throttle.get())
+ return false;
+
+ context.success(Optional.empty());
+ return true;
+ },
+ Duration.ofMillis(30),
+ clock,
+ "test");
+ synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts.
+
+ // Add three operations:
+ //  the first shall be handled by the queue on second attempt,
+ // the second by an external call,and
+ // the third during shutdown — added later.
+ assertTrue(queue.add(nullOperation, context1));
+ clock.advance(Duration.ofMillis(20));
+ assertTrue(queue.add(nullOperation, context2));
+ assertTrue(queue.add(nullOperation, context3));
+ assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3));
+ assertEquals(3, queue.size());
+ assertEquals(0, counter1.get());
+ assertEquals(0, counter2.get());
+ assertEquals(0, counter3.get());
+
+ context2.error(ERROR, "error"); // Marks this as handled, ready to be evicted.
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer does not run yet, as it's not yet time.
+ assertEquals(0, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(3, queue.size());
+
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry.
+ assertEquals(0, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(2, queue.size());
+
+ throttle.set(false);
+ clock.advance(Duration.ofMillis(15));
+ synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry.
+ assertEquals(1, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(0, counter3.get());
+ assertEquals(1, queue.size());
+
+ queue.shutdown(Duration.ZERO, context -> context.error(ERROR, "shutdown"))
+ .get(1, TimeUnit.SECONDS);
+ assertEquals(1, counter1.get());
+ assertEquals(-1, counter2.get());
+ assertEquals(-1, counter3.get());
+ assertEquals(0, queue.size());
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
new file mode 100644
index 00000000000..9554fa0a913
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -0,0 +1,373 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi.resource;
+
+import com.yahoo.container.jdisc.RequestHandlerTestDriver;
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentGet;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorMock;
+import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.DocumentOperationParser;
+import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.documentapi.DocumentAccessParams;
+import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.searchdefinition.derived.Deriver;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.JsonFormat;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.test.ManualClock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
+import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PATCH;
+import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author jonmv
+ */
+public class DocumentV1ApiTest {
+
+ final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build();
+ final DocumentTypeManager manager = new DocumentTypeManager(docConfig);
+ final Document doc1 = new Document(manager.getDocumentType("music"), "id:space:music::one");
+ final Document doc2 = new Document(manager.getDocumentType("music"), "id:space:music:n=1:two");
+ final Document doc3 = new Document(manager.getDocumentType("music"), "id:space:music:g=a:three");
+ {
+ doc1.setFieldValue("artist", "Tom Waits");
+ doc2.setFieldValue("artist", "Asa-Chan & Jun-Ray");
+ }
+
+ ManualClock clock;
+ DocumentOperationParser parser;
+ LocalDocumentAccess access;
+ DocumentOperationExecutorMock executor;
+ Metric metric;
+ MetricReceiver metrics;
+ DocumentV1ApiHandler handler;
+
+ @Before
+ public void setUp() {
+ clock = new ManualClock();
+ parser = new DocumentOperationParser(docConfig);
+ access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(docConfig));
+ executor = new DocumentOperationExecutorMock();
+ metric = new NullMetric();
+ metrics = new MetricReceiver.MockReceiver();
+ handler = new DocumentV1ApiHandler(clock, executor, parser, metric, metrics);
+ }
+
+ @After
+ public void tearDown() {
+ handler.destroy();
+ }
+
+ @Test
+ public void testResponses() {
+ RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
+ // GET at non-existent path returns 404 with available paths
+ var response = driver.sendRequest("http://localhost/document/v1/not-found");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/not-found\"," +
+ " \"message\": \"Nothing at '/document/v1/not-found'. Available paths are:\\n" +
+ "/document/v1/\\n" +
+ "/document/v1/{namespace}/{documentType}/docid/\\n" +
+ "/document/v1/{namespace}/{documentType}/group/{group}/\\n" +
+ "/document/v1/{namespace}/{documentType}/number/{number}/\\n" +
+ "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" +
+ "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" +
+ "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" +
+ "}",
+ response.readAll());
+ assertEquals("application/json; charset=UTF-8", response.getResponse().headers().getFirst("Content-Type"));
+ assertEquals(404, response.getStatus());
+
+ // GET at root is a visit. Numeric parameters have an upper bound.
+ response = driver.sendRequest("http://localhost/document/v1?cluster=lackluster&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
+ "&selection=all%20the%20things&fieldSet=[id]&continuation=token");
+ executor.lastVisitContext().document(doc1);
+ executor.lastVisitContext().document(doc2);
+ executor.lastVisitContext().document(doc3);
+ executor.lastVisitContext().success(Optional.of("token"));
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1\"," +
+ " \"documents\": [" +
+ " {" +
+ " \"id\": \"id:space:music::one\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Tom Waits\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Asa-Chan & Jun-Ray\"" +
+ " }" +
+ " }," +
+ " {" +
+ " \"id\": \"id:space:music:g=a:three\"," +
+ " \"fields\": {}" +
+ " }" +
+ " ]," +
+ " \"continuation\": \"token\"" +
+ "}",
+ response.readAll());
+ assertEquals(200, response.getStatus());
+ assertEquals(VisitorOptions.builder().cluster("lackluster").bucketSpace("default").wantedDocumentCount(1024)
+ .concurrency(100).selection("all the things").fieldSet("[id]").continuation("token").build(),
+ executor.lastOptions());
+
+ // GET with namespace and document type is a restricted visit.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid");
+ executor.lastVisitContext().error(BAD_REQUEST, "nope");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid\"," +
+ " \"documents\": []," +
+ " \"message\": \"nope\"" +
+ "}",
+ response.readAll());
+ assertEquals(400, response.getStatus());
+ assertEquals(VisitorOptions.builder().namespace("space").documentType("music").build(),
+ executor.lastOptions());
+
+ // GET with namespace, document type and group is a restricted visit.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/group/best");
+ executor.lastVisitContext().error(ERROR, "error");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/group/best\"," +
+ " \"documents\": []," +
+ " \"message\": \"error\"" +
+ "}",
+ response.readAll());
+ assertEquals(500, response.getStatus());
+ assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of("best")).build(),
+ executor.lastOptions());
+
+ // GET with namespace, document type and number is a restricted visit.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/123");
+ executor.lastVisitContext().success(Optional.empty());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/123\"," +
+ " \"documents\": []" +
+ "}",
+ response.readAll());
+ assertEquals(200, response.getStatus());
+ assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of(123)).build(),
+ executor.lastOptions());
+
+ // GET with full document ID is a document get operation which returns 404 when no document is found
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=lackluster&fieldSet=go");
+ executor.lastOperationContext().success(Optional.empty());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid/one\"," +
+ " \"id\": \"id:space:music::one\"" +
+ "}",
+ response.readAll());
+ assertEquals(404, response.getStatus());
+ assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation());
+ assertEquals(parameters().withRoute("route-to-lackluster").withFieldSet("go"), executor.lastParameters());
+
+ // GET with full document ID is a document get operation.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?");
+ executor.lastOperationContext().success(Optional.of(doc1));
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid/one\"," +
+ " \"id\": \"id:space:music::one\"," +
+ " \"fields\": {" +
+ " \"artist\": \"Tom Waits\"" +
+ " }" +
+ "}",
+ response.readAll());
+ assertEquals(200, response.getStatus());
+ assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation());
+ assertEquals(parameters(), executor.lastParameters());
+
+ // GET with not encoded / in user specified part of document id is a 404
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one/two/three");
+ response.readAll(); // Must drain body.
+ assertEquals(404, response.getStatus());
+
+ // POST with a document payload is a document put operation.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST,
+ "{" +
+ " \"fields\": {" +
+ " \"artist\": \"Asa-Chan & Jun-Ray\"" +
+ " }" +
+ "}");
+ executor.lastOperationContext().success(Optional.empty());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"id\": \"id:space:music:n=1:two\"" +
+ "}",
+ response.readAll());
+ assertEquals(200, response.getStatus());
+ DocumentPut put = new DocumentPut(doc2);
+ put.setCondition(new TestAndSetCondition("test it"));
+ assertEquals(put, executor.lastOperation());
+ assertEquals(parameters(), executor.lastParameters());
+
+ // PUT with a document update payload is a document update operation.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/group/a/three?create=true", PUT,
+ "{" +
+ " \"fields\": {" +
+ " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" +
+ " }" +
+ "}");
+ executor.lastOperationContext().success(Optional.empty());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/group/a/three\"," +
+ " \"id\": \"id:space:music:g=a:three\"" +
+ "}",
+ response.readAll());
+ DocumentUpdate update = new DocumentUpdate(doc3.getDataType(), doc3.getId());
+ update.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl")));
+ update.setCreateIfNonExistent(true);
+ assertEquals(update, executor.lastOperation());
+ assertEquals(parameters(), executor.lastParameters());
+ assertEquals(200, response.getStatus());
+
+ // POST with illegal payload is a 400
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST,
+ "{" +
+ " ┻━┻︵ \\(°□°)/ ︵ ┻━┻" +
+ "}");
+ Inspector responseRoot = SlimeUtils.jsonToSlime(response.readAll()).get();
+ assertEquals("/document/v1/space/music/number/1/two", responseRoot.field("pathId").asString());
+ assertTrue(responseRoot.field("message").asString().startsWith("Unexpected character ('┻' (code 9531 / 0x253b)): was expecting double-quote to start field name"));
+ assertEquals(400, response.getStatus());
+
+ // PUT on a unknown document type is a 400
+ response = driver.sendRequest("http://localhost/document/v1/space/house/group/a/three?create=true", PUT,
+ "{" +
+ " \"fields\": {" +
+ " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" +
+ " }" +
+ "}");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/house/group/a/three\"," +
+ " \"message\": \"Document type house does not exist\"" +
+ "}",
+ response.readAll());
+ assertEquals(400, response.getStatus());
+
+ // DELETE with full document ID is a document remove operation.
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route", DELETE);
+ executor.lastOperationContext().success(Optional.empty());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"id\": \"id:space:music:n=1:two\"" +
+ "}",
+ response.readAll());
+ assertEquals(200, response.getStatus());
+ assertEquals(new DocumentRemove(doc2.getId()), executor.lastOperation());
+ assertEquals(parameters().withRoute("route"), executor.lastParameters());
+
+ // GET with non-existent cluster is a 400
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?cluster=throw-me");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"throw-me\"" +
+ "}",
+ response.readAll());
+ assertEquals(400, response.getStatus());
+
+ // TIMEOUT is a 504
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ executor.lastOperationContext().error(TIMEOUT, "timeout");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"message\": \"timeout\"" +
+ "}",
+ response.readAll());
+ assertEquals(504, response.getStatus());
+
+ // OVERLOAD is a 429
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ executor.lastOperationContext().error(OVERLOAD, "overload");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"message\": \"overload\"" +
+ "}",
+ response.readAll());
+ assertEquals(429, response.getStatus());
+
+ // PRECONDITION_FAILED is a 412
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ executor.lastOperationContext().error(PRECONDITION_FAILED, "no dice");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"id\": \"id:space:music:n=1:two\"," +
+ " \"message\": \"no dice\"" +
+ "}",
+ response.readAll());
+ assertEquals(412, response.getStatus());
+
+ // Client close during processing gives empty body
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ response.clientClose();
+ executor.lastOperationContext().error(TIMEOUT, "no dice");
+ assertEquals("", response.readAll());
+ assertEquals(504, response.getStatus());
+
+ // OPTIONS gets options
+ response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", OPTIONS);
+ assertEquals("", response.readAll());
+ assertEquals(204, response.getStatus());
+ assertEquals("GET,POST,PUT,DELETE", response.getResponse().headers().getFirst("Allow"));
+
+ // PATCH is not allowed
+ response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", PATCH);
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/docid/one\"," +
+ " \"message\": \"'PATCH' not allowed at '/document/v1/space/music/docid/one'. Allowed methods are: GET, POST, PUT, DELETE\"" +
+ "}",
+ response.readAll());
+ assertEquals(405, response.getStatus());
+
+ driver.close();
+ }
+
+ void assertSameJson(String expected, String actual) {
+ ByteArrayOutputStream expectedPretty = new ByteArrayOutputStream();
+ ByteArrayOutputStream actualPretty = new ByteArrayOutputStream();
+ JsonFormat formatter = new JsonFormat(false);
+ try {
+ formatter.encode(actualPretty, SlimeUtils.jsonToSlimeOrThrow(actual));
+ formatter.encode(expectedPretty, SlimeUtils.jsonToSlimeOrThrow(expected));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ assertEquals(expectedPretty.toString(UTF_8), actualPretty.toString(UTF_8));
+ }
+
+}