aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java54
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java32
-rw-r--r--documentapi/abi-spec.json15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java136
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java7
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
-rw-r--r--vespaclient-container-plugin/pom.xml6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java704
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java603
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def15
11 files changed, 1538 insertions, 106 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
new file mode 100644
index 00000000000..cfb48339dbe
--- /dev/null
+++ b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
@@ -0,0 +1,54 @@
+package com.yahoo.container.core;
+
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.application.BindingMatch;
+import com.yahoo.jdisc.application.UriPattern;
+import com.yahoo.jdisc.handler.ResponseHandler;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Common HTTP request handler metrics code.
+ *
+ * @author jonmv
+ */
+public class HandlerMetricContextUtil {
+
+ public static void onHandle(Request request, Metric metric, Class<?> handlerClass) {
+ metric.add("handled.requests", 1, contextFor(request, metric, handlerClass));
+ }
+
+ public static void onHandled(Request request, Metric metric, Class<?> handlerClass) {
+ metric.set("handled.latency", request.timeElapsed(TimeUnit.MILLISECONDS), contextFor(request, metric, handlerClass));
+ }
+
+ public static Metric.Context contextFor(Request request, Metric metric, Class<?> handlerClass) {
+ return contextFor(request, Map.of(), metric, handlerClass);
+ }
+
+ public static Metric.Context contextFor(Request request, Map<String, String> extraDimensions, Metric metric, Class<?> handlerClass) {
+ BindingMatch<?> match = request.getBindingMatch();
+ if (match == null) return null;
+ UriPattern matched = match.matched();
+ if (matched == null) return null;
+ String name = matched.toString();
+ String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
+
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put("handler", name);
+ if (endpoint != null) {
+ dimensions.put("endpoint", endpoint);
+ }
+ URI uri = request.getUri();
+ dimensions.put("scheme", uri.getScheme());
+ dimensions.put("port", Integer.toString(uri.getPort()));
+ dimensions.put("handler-name", handlerClass.getName());
+ dimensions.putAll(extraDimensions);
+ return metric.createContext(dimensions);
+ }
+
+}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
index ab768dba0d2..f5f8b428535 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
@@ -6,8 +6,6 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.application.BindingMatch;
-import com.yahoo.jdisc.application.UriPattern;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
import com.yahoo.jdisc.handler.BufferedContentChannel;
import com.yahoo.jdisc.handler.ContentChannel;
@@ -15,10 +13,9 @@ import com.yahoo.jdisc.handler.OverloadException;
import com.yahoo.jdisc.handler.ReadableContentChannel;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.container.core.HandlerMetricContextUtil;
-import java.net.URI;
import java.time.Duration;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@@ -79,29 +76,9 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
}
Metric.Context contextFor(Request request, Map<String, String> extraDimensions) {
- BindingMatch<?> match = request.getBindingMatch();
- if (match == null) return null;
- UriPattern matched = match.matched();
- if (matched == null) return null;
- String name = matched.toString();
- String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
-
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put("handler", name);
- if (endpoint != null) {
- dimensions.put("endpoint", endpoint);
- }
- URI uri = request.getUri();
- dimensions.put("scheme", uri.getScheme());
- dimensions.put("port", Integer.toString(uri.getPort()));
- String handlerClassName = getClass().getName();
- dimensions.put("handler-name", handlerClassName);
- dimensions.putAll(extraDimensions);
- return this.metric.createContext(dimensions);
+ return HandlerMetricContextUtil.contextFor(request, extraDimensions, metric, getClass());
}
- private Metric.Context contextFor(Request request) { return contextFor(request, Map.of()); }
-
/**
* Handles a request by assigning a worker thread to it.
*
@@ -109,7 +86,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
*/
@Override
public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
- metric.add("handled.requests", 1, contextFor(request));
+ HandlerMetricContextUtil.onHandle(request, metric, getClass());
if (request.getTimeout(TimeUnit.SECONDS) == null) {
Duration timeout = getTimeout();
if (timeout != null) {
@@ -212,8 +189,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
public ContentChannel handleResponse(Response response) {
if ( tryHasResponded()) throw new IllegalStateException("Response already handled");
ContentChannel cc = responseHandler.handleResponse(response);
- long millis = request.timeElapsed(TimeUnit.MILLISECONDS);
- metric.set("handled.latency", millis, contextFor(request));
+ HandlerMetricContextUtil.onHandled(request, metric, getClass());
return cc;
}
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index f5f2a7c1845..c8cbc978a8f 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -979,7 +979,9 @@
"public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
- "public void destroy()"
+ "public void destroy()",
+ "public void setPhaser(java.util.concurrent.Phaser)",
+ "public void setResultType(com.yahoo.documentapi.Result$ResultType)"
],
"fields": []
},
@@ -991,12 +993,15 @@
],
"methods": [
"public void <init>(com.yahoo.documentapi.DocumentAccessParams)",
- "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
- "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
+ "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
"public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)",
"public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)"
+ "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)",
+ "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)"
],
"fields": []
},
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 40f26a82a89..ff3eeb02a71 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -20,29 +20,36 @@ import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
/**
* @author bratseth
+ * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final List<Response> responses = new LinkedList<>();
+ private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private long requestId = 0;
- private Random random = new Random();
+ private final Executor executor = Executors.newCachedThreadPool();
- private synchronized long getNextRequestId() {
- requestId++;
- return requestId;
- }
+ private AtomicLong requestId = new AtomicLong(0);
+ private AtomicReference<Phaser> phaser = new AtomicReference<>();
+ private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- random.setSeed(System.currentTimeMillis());
syncSession = access.createSyncSession(new SyncParameters.Builder().build());
}
@@ -58,14 +65,15 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- syncSession.put(documentPut, pri);
- addResponse(new DocumentResponse(req, documentPut.getDocument()));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ syncSession.put(documentPut, pri);
+ return new DocumentResponse(req, documentPut.getDocument());
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -81,13 +89,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- addResponse(new DocumentResponse(req, syncSession.get(id)));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ return new DocumentResponse(req, syncSession.get(id));
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -97,13 +106,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- addResponse(new RemoveResponse(req, true));
- } else {
- addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.remove(new DocumentRemove(id), pri)) {
+ return new RemoveResponse(req, true);
+ }
+ else {
+ return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
@@ -113,27 +123,24 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.update(update, pri)) {
- addResponse(new UpdateResponse(req, true));
- } else {
- addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.update(update, pri)) {
+ return new UpdateResponse(req, true);
+ }
+ else {
+ return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
public Response getNext() {
- if (responses.isEmpty()) {
- return null;
- }
- int index = random.nextInt(responses.size());
- return responses.remove(index);
+ return responses.poll();
}
@Override
- public Response getNext(int timeout) {
- return getNext();
+ public Response getNext(int timeoutMilliseconds) throws InterruptedException {
+ return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
@Override
@@ -141,6 +148,22 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
+ /**
+ * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
+ * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
+ * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
+ *
+ * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
+ */
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
+ }
+
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
+ public void setResultType(Result.ResultType resultType) {
+ this.result.set(resultType);
+ }
+
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -149,4 +172,23 @@ public class LocalAsyncSession implements AsyncSession {
}
}
+ private Result send(Function<Long, Response> responses) {
+ Result.ResultType resultType = result.get();
+ if (resultType != SUCCESS)
+ return new Result(resultType, new Error());
+
+ long req = requestId.incrementAndGet();
+ Phaser synchronizer = phaser.get();
+ if (synchronizer == null)
+ addResponse(responses.apply(req));
+ else
+ executor.execute(() -> {
+ synchronizer.register();
+ synchronizer.arriveAndAwaitAdvance();
+ addResponse(responses.apply(req));
+ synchronizer.arriveAndDeregister();
+ });
+ return new Result(req);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index c69a8fb48de..e24853b9294 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public SyncSession createSyncSession(SyncParameters parameters) {
+ public LocalSyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public AsyncSession createAsyncSession(AsyncParameters parameters) {
+ public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index f087b646ca4..85be1c11fcd 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -76,8 +76,13 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ try {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ return;
+ }
+ catch (RuntimeException e) {
return;
+ }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 69dc7c6da74..33cae60ab93 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,6 +36,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,17 +91,22 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
+ LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- int timeout = 100;
+
+ // Let all async operations wait for a signal from the test thread before sending their responses, and let test
+ // thread wait for all responses to be delivered afterwards.
+ Phaser phaser = new Phaser(1);
+ session.setPhaser(phaser);
long startTime = System.currentTimeMillis();
+ int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -104,27 +115,38 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- List<Document> documents = new ArrayList<>();
- try {
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeout - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse)response).getDocument());
- outstandingRequests.remove(response.getRequestId());
+ // Wait for responses in separate thread.
+ Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ List<Document> documents = new ArrayList<>();
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeoutMillis - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse) response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
+ }
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for documents", e);
- }
+ catch (InterruptedException e) {
+ throw new IllegalArgumentException("Interrupted while waiting for responses");
+ }
+ });
+
+ // All operations, and receiver, now waiting for this thread to arrive.
+ assertEquals(4, phaser.getRegisteredParties());
+ assertEquals(0, phaser.getPhase());
+ phaser.arrive();
+ assertEquals(1, phaser.getPhase());
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
+ futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
}
@Test
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index 9c4b81da806..8254c208588 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -72,6 +72,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
new file mode 100644
index 00000000000..b675af3b564
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -0,0 +1,704 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.text.Text;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.SEVERE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Encapsulates a document access and supports running asynchronous document
+ * operations and visits against this, with retries and optional timeouts.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutor {
+
+ private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName());
+
+ private final Duration visitTimeout;
+ private final long maxThrottled;
+ private final DocumentAccess access;
+ private final AsyncSession asyncSession;
+ private final Map<String, StorageCluster> clusters;
+ private final Clock clock;
+ private final DelayQueue throttled;
+ private final DelayQueue timeouts;
+ private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
+ private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+
+ public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
+ DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
+ this(Duration.ofMillis(executorConfig.resendDelayMillis()),
+ Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
+ Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
+ executorConfig.maxThrottled(),
+ access,
+ parseClusters(clustersConfig, bucketsConfig),
+ clock);
+ }
+
+ DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
+ DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
+ this.visitTimeout = requireNonNull(visitTimeout);
+ this.maxThrottled = maxThrottled;
+ this.access = requireNonNull(access);
+ this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
+ this.clock = requireNonNull(clock);
+ this.clusters = Map.copyOf(clusters);
+ this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
+ this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
+ }
+
+ /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
+ public void shutdown() {
+ long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
+ Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
+ context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
+ Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
+ context -> context.error(TIMEOUT, "Timed out due to shutdown"));
+ visits.values().forEach(VisitorSession::destroy);
+ try {
+ throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
+ }
+ }
+
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.get(id, parameters), context);
+ }
+
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.put(put, parameters), context);
+ }
+
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.update(update, parameters), context);
+ }
+
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.remove(id, parameters), context);
+ }
+
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ try {
+ AtomicBoolean done = new AtomicBoolean(false);
+ VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
+ @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
+ });
+ parameters.setControlHandler(new VisitorControlHandler() {
+ @Override public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets())
+ context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
+ case SUCCESS: // intentional fallthrough
+ case ABORTED:
+ context.success(Optional.ofNullable(getProgress())
+ .filter(progress -> ! progress.isFinished())
+ .map(ProgressToken::serializeToString));
+ break;
+ default:
+ context.error(ERROR, message != null ? message : "Visiting failed");
+ }
+ done.set(true); // This may be reached before dispatching thread is done putting us in the map.
+ visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
+ }
+ });
+ visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ if (done.get())
+ visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
+ }
+ catch (IllegalArgumentException | ParseException e) {
+ context.error(BAD_REQUEST, Exceptions.toMessageString(e));
+ }
+ catch (RuntimeException e) {
+ context.error(ERROR, Exceptions.toMessageString(e));
+ }
+ }
+
+ public String routeToCluster(String cluster) {
+ return resolveCluster(Optional.of(cluster), clusters).route();
+ }
+
+
+ public enum ErrorType {
+ OVERLOAD,
+ NOT_FOUND,
+ PRECONDITION_FAILED,
+ BAD_REQUEST,
+ TIMEOUT,
+ ERROR;
+ }
+
+
+ /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
+ public static class VisitOperationsContext extends Context<Optional<String>> {
+
+ private final Consumer<Document> onDocument;
+
+ public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
+ super(onError, onSuccess);
+ this.onDocument = onDocument;
+ }
+
+ void document(Document document) {
+ if ( ! handled())
+ onDocument.accept(document);
+ }
+
+ }
+
+
+ /** Context for a document operation. */
+ public static class OperationContext extends Context<Optional<Document>> {
+
+ public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
+ super(onError, onSuccess);
+ }
+
+ }
+
+
+ public static class VisitorOptions {
+
+ private final Optional<String> cluster;
+ private final Optional<String> namespace;
+ private final Optional<String> documentType;
+ private final Optional<Group> group;
+ private final Optional<String> selection;
+ private final Optional<String> fieldSet;
+ private final Optional<String> continuation;
+ private final Optional<String> bucketSpace;
+ private final Optional<Integer> wantedDocumentCount;
+ private final Optional<Integer> concurrency;
+
+ private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
+ Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
+ Optional<String> continuation,Optional<String> bucketSpace,
+ Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
+ this.cluster = cluster;
+ this.namespace = namespace;
+ this.documentType = documentType;
+ this.group = group;
+ this.selection = selection;
+ this.fieldSet = fieldSet;
+ this.continuation = continuation;
+ this.bucketSpace = bucketSpace;
+ this.wantedDocumentCount = wantedDocumentCount;
+ this.concurrency = concurrency;
+ }
+
+ private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
+ if (cluster.isEmpty() && documentType.isEmpty())
+ throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+
+ VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
+ documentType,
+ namespace.map(value -> "id.namespace=='" + value + "'"),
+ group.map(Group::selection))
+ .flatMap(Optional::stream)
+ .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
+ StringJoiner::add,
+ StringJoiner::merge)
+ .toString());
+
+ continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
+ parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10));
+ concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1)));
+ parameters.setTimeoutMs(visitTimeout.toMillis());
+ parameters.visitInconsistentBuckets(true);
+ parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
+
+ StorageCluster storageCluster = resolveCluster(cluster, clusters);
+ parameters.setRoute(storageCluster.route());
+ parameters.setBucketSpace(resolveBucket(storageCluster,
+ documentType,
+ List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
+ bucketSpace));
+
+ return parameters;
+ }
+
+ public static Builder builder() { return new Builder(); }
+
+
+ public static class Builder {
+
+ private String cluster;
+ private String documentType;
+ private String namespace;
+ private Group group;
+ private String selection;
+ private String fieldSet;
+ private String continuation;
+ private String bucketSpace;
+ private Integer wantedDocumentCount;
+ private Integer concurrency;
+
+ public Builder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder documentType(String documentType) {
+ this.documentType = documentType;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder group(Group group) {
+ this.group = group;
+ return this;
+ }
+
+ public Builder selection(String selection) {
+ this.selection = selection;
+ return this;
+ }
+
+ public Builder fieldSet(String fieldSet) {
+ this.fieldSet = fieldSet;
+ return this;
+ }
+
+ public Builder continuation(String continuation) {
+ this.continuation = continuation;
+ return this;
+ }
+
+ public Builder bucketSpace(String bucketSpace) {
+ this.bucketSpace = bucketSpace;
+ return this;
+ }
+
+ public Builder wantedDocumentCount(Integer wantedDocumentCount) {
+ this.wantedDocumentCount = wantedDocumentCount;
+ return this;
+ }
+
+ public Builder concurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ return this;
+ }
+
+ public VisitorOptions build() {
+ return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
+ Optional.ofNullable(namespace), Optional.ofNullable(group),
+ Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
+ Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
+ Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
+ }
+
+ }
+
+ }
+
+
+ public static class Group {
+
+ private final String value;
+ private final String docIdPart;
+ private final String selection;
+
+ private Group(String value, String docIdPart, String selection) {
+ Text.validateTextString(value)
+ .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
+ this.value = value;
+ this.docIdPart = docIdPart;
+ this.selection = selection;
+ }
+
+ public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
+
+ public String value() { return value; }
+ public String docIdPart() { return docIdPart; }
+ public String selection() { return selection; }
+
+ }
+
+
+ /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
+ private void accept(Supplier<Result> operation, OperationContext context) {
+ timeouts.add(operation, context);
+ send(operation, context);
+ }
+
+ /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
+ private void send(Supplier<Result> operation, OperationContext context) {
+ Result result = operation.get();
+ switch (result.type()) {
+ case SUCCESS:
+ outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
+ break;
+ case TRANSIENT_ERROR:
+ if ( ! throttled.add(operation, context))
+ context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
+ break;
+ default:
+ log.log(WARNING, "Unknown result type '" + result.type() + "'");
+ case FATAL_ERROR: // intentional fallthrough
+ context.error(ERROR, result.getError().getMessage());
+ }
+ }
+
+ private void handle(Response response) {
+ outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
+ }
+
+ private static ErrorType toErrorType(Response.Outcome outcome) {
+ switch (outcome) {
+ case NOT_FOUND:
+ return NOT_FOUND;
+ case CONDITION_FAILED:
+ return PRECONDITION_FAILED;
+ default:
+ log.log(WARNING, "Unexpected response outcome: " + outcome);
+ case ERROR: // intentional fallthrough
+ return ERROR;
+ }
+ }
+
+
+ /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
+ private static class Context<T> {
+
+ private final AtomicBoolean handled = new AtomicBoolean();
+ private final BiConsumer<ErrorType, String> onError;
+ private final Consumer<T> onSuccess;
+
+ Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
+ this.onError = onError;
+ this.onSuccess = onSuccess;
+ }
+
+ void error(ErrorType type, String message) {
+ if ( ! handled.getAndSet(true))
+ onError.accept(type, message);
+ }
+
+ void success(T result) {
+ if ( ! handled.getAndSet(true))
+ onSuccess.accept(result);
+ }
+
+ boolean handled() {
+ return handled.get();
+ }
+
+ }
+
+
+ private static class Completion {
+
+ private final OperationContext context;
+ private final Response response;
+
+ private Completion(OperationContext context, Response response) {
+ this.context = context;
+ this.response = response;
+ }
+
+ static Completion of(OperationContext context) {
+ return new Completion(requireNonNull(context), null);
+ }
+
+ static Completion of(Response response) {
+ return new Completion(null, requireNonNull(response));
+ }
+
+ Completion merge(Completion other) {
+ if (context == null)
+ complete(other.context, response);
+ else
+ complete(context, other.response);
+ return null;
+ }
+
+ private void complete(OperationContext context, Response response) {
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ }
+
+ }
+
+
+ /**
+ * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
+ *
+ * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
+ * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
+ */
+ static class DelayQueue {
+
+ private final long maxSize;
+ private final Clock clock;
+ private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong size = new AtomicLong(0);
+ private final Thread maintainer;
+ private final Duration delay;
+ private final long defaultWaitMillis;
+
+ public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
+ if (maxSize < 0)
+ throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
+ if (delay.isNegative())
+ throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
+
+ this.maxSize = maxSize;
+ this.delay = delay;
+ this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
+ this.clock = requireNonNull(clock);
+ this.maintainer = new Thread(() -> maintain(action));
+ this.maintainer.start();
+ }
+
+ boolean add(Supplier<Result> operation, OperationContext context) {
+ if (size.incrementAndGet() > maxSize) {
+ size.decrementAndGet();
+ return false;
+ }
+ return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
+ }
+
+ long size() { return size.get(); }
+
+ Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
+ ExecutorService shutdownService = Executors.newSingleThreadExecutor();
+ Future<?> future = shutdownService.submit(() -> {
+ try {
+ Thread.sleep(grace.toMillis());
+ }
+ finally {
+ maintainer.interrupt();
+ for (Delayed delayed; (delayed = queue.poll()) != null; ) {
+ size.decrementAndGet();
+ onShutdown.accept(delayed.context());
+ }
+ }
+ return null;
+ });
+ shutdownService.shutdown();
+ return future;
+ }
+
+ /**
+ * Repeatedly loops through the queue, evicting already handled entries and processing those
+ * which have become ready since last time, then waits until new items are guaranteed to be ready,
+ * or until it's time for a new run just to ensure GC of handled entries.
+ * The entries are assumed to always be added to the back of the queue, with the same delay.
+ * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
+ * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
+ */
+ private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
+ while ( ! Thread.currentThread().isInterrupted()) {
+ try {
+ Instant waitUntil = null;
+ Iterator<Delayed> operations = queue.iterator();
+ while (operations.hasNext()) {
+ Delayed delayed = operations.next();
+ // Already handled: remove and continue.
+ if (delayed.context().handled()) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ // Ready for action: remove from queue and run.
+ if (delayed.readyAt().isBefore(clock.instant())) {
+ operations.remove();
+ size.decrementAndGet();
+ action.accept(delayed.operation(), delayed.context());
+ continue;
+ }
+ // Not yet ready for action: keep time to wake up again.
+ waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
+ }
+ long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
+ synchronized (this) {
+ do {
+ notify();
+ wait(Math.max(0, waitUntilMillis - clock.millis()));
+ }
+ while (clock.millis() < waitUntilMillis);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e) {
+ log.log(SEVERE, "Exception caught by delay queue maintainer", e);
+ }
+ }
+ }
+ }
+
+
+ private static class Delayed {
+
+ private final Supplier<Result> operation;
+ private final OperationContext context;
+ private final Instant readyAt;
+
+ Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
+ this.readyAt = requireNonNull(readyAt);
+ this.context = requireNonNull(context);
+ this.operation = requireNonNull(operation);
+ }
+
+ Supplier<Result> operation() { return operation; }
+ OperationContext context() { return context; }
+ Instant readyAt() { return readyAt; }
+
+ }
+
+
+ static class StorageCluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<String, String> documentBuckets;
+
+ StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() { return name; }
+ String configId() { return configId; }
+ String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
+ Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
+
+ }
+
+
+ private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ if (clusters.isEmpty())
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled.");
+
+ return wanted.map(cluster -> {
+ if ( ! clusters.containsKey(cluster))
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.get(cluster);
+ }).orElseGet(() -> {
+ if (clusters.size() > 1)
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.values().iterator().next();
+ });
+ }
+
+ private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
+ return documentType.map(type -> cluster.bucketOf(type)
+ .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
+ "' is not mapped to a known bucket space")))
+ .or(() -> bucketSpace.map(space -> {
+ if ( ! bucketSpaces.contains(space))
+ throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
+ String.join(", ", bucketSpaces));
+ return space;
+ }))
+ .orElse(FixedBucketSpaces.defaultSpace());
+ }
+
+
+
+ private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
+ return clusters.storage().stream()
+ .collect(toUnmodifiableMap(storage -> storage.name(),
+ storage -> new StorageCluster(storage.name(),
+ storage.configid(),
+ buckets.cluster(storage.name())
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().bucketSpace())))));
+ }
+
+
+ // Visible for testing.
+ AsyncSession asyncSession() { return asyncSession; }
+ Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
+ void notifyMaintainers() throws InterruptedException {
+ synchronized (throttled) { throttled.notify(); throttled.wait(); }
+ synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
new file mode 100644
index 00000000000..96ea6c08f86
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -0,0 +1,603 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi.resource;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.json.JsonReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.json.document.DocumentParser;
+import com.yahoo.document.restapi.DocumentOperationExecutor;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ReadableContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.handler.UnsafeContentInputStream;
+import com.yahoo.container.core.HandlerMetricContextUtil;
+import com.yahoo.jdisc.http.HttpRequest;
+import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.restapi.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
+import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
+import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
+import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Asynchronous HTTP handler for /document/v1/
+ *
+ * @author jonmv
+ */
+public class DocumentV1ApiHandler extends AbstractRequestHandler {
+
+ private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
+ private static final Parser<Integer> numberParser = Integer::parseInt;
+ private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
+
+ private static final CompletionHandler logException = new CompletionHandler() {
+ @Override public void completed() { }
+ @Override public void failed(Throwable t) {
+ log.log(WARNING, "Exception writing response data", t);
+ }
+ };
+
+ private static final ContentChannel ignoredContent = new ContentChannel() {
+ @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
+ @Override public void close(CompletionHandler handler) { handler.completed(); }
+ };
+
+ private static final String CREATE = "create";
+ private static final String CONDITION = "condition";
+ private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
+ private static final String FIELD_SET = "fieldSet";
+ private static final String SELECTION = "selection";
+ private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
+ private static final String CONTINUATION = "continuation";
+ private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
+ private static final String CONCURRENCY = "concurrency";
+ private static final String BUCKET_SPACE = "bucketSpace";
+
+ private final Clock clock;
+ private final Metric metric; // TODO jonmv: make response class which logs on completion/error
+ private final DocumentApiMetrics metrics;
+ private final DocumentOperationExecutor executor;
+ private final DocumentOperationParser parser;
+ private final Map<String, Map<Method, Handler>> handlers;
+
+ @Inject
+ public DocumentV1ApiHandler(Clock clock,
+ Metric metric,
+ MetricReceiver metricReceiver,
+ VespaDocumentAccess documentAccess,
+ DocumentmanagerConfig documentManagerConfig,
+ ClusterListConfig clusterListConfig,
+ AllClustersBucketSpacesConfig bucketSpacesConfig,
+ DocumentOperationExecutorConfig executorConfig) {
+ this(clock,
+ new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
+ new DocumentOperationParser(documentManagerConfig),
+ metric,
+ metricReceiver);
+ }
+
+ DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
+ Metric metric, MetricReceiver metricReceiver) {
+ this.clock = clock;
+ this.executor = executor;
+ this.parser = parser;
+ this.metric = metric;
+ this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.handlers = defineApi();
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
+ HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
+ ResponseHandler responseHandler = response -> {
+ HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
+ return rawResponseHandler.handleResponse(response);
+ };
+
+ HttpRequest request = (HttpRequest) rawRequest;
+ try {
+ Path requestPath = new Path(request.getUri());
+ for (String path : handlers.keySet())
+ if (requestPath.matches(path)) {
+ Map<Method, Handler> methods = handlers.get(path);
+ if (methods.containsKey(request.getMethod()))
+ return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
+
+ if (request.getMethod() == OPTIONS)
+ return options(methods.keySet(), responseHandler);
+
+ return methodNotAllowed(request, methods.keySet(), responseHandler);
+ }
+ return notFound(request, handlers.keySet(), responseHandler);
+ }
+ catch (IllegalArgumentException e) {
+ return badRequest(request, e, responseHandler);
+ }
+ catch (RuntimeException e) {
+ return serverError(request, e, responseHandler);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ this.executor.shutdown();
+ }
+
+ private Map<String, Map<Method, Handler>> defineApi() {
+ Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
+
+ handlers.put("/document/v1/",
+ Map.of(GET, this::getRoot));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ return Collections.unmodifiableMap(handlers);
+ }
+
+ private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ Cursor documents = root.setArray("documents");
+ executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ VisitorOptions.Builder options = parseOptions(request, path);
+ options = options.documentType(path.documentType());
+ options = options.namespace(path.namespace());
+ options = path.group().map(options::group).orElse(options);
+ executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
+ Object monitor = new Object();
+ return new VisitOperationsContext((type, message) -> {
+ synchronized (monitor) {
+ handleError(request, type, message, root, handler);
+ }
+ },
+ token -> {
+ token.ifPresent(value -> root.setString("continuation", value));
+ synchronized (monitor) {
+ respond(root, handler);
+ }
+ },
+ // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
+ document -> {
+ synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
+ SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
+ documents.addObject());
+ }
+ });
+ }
+ private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ DocumentId id = path.id();
+ DocumentOperationParameters parameters = parameters();
+ parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
+ parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
+ executor.get(id,
+ parameters,
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ document -> {
+ Cursor root = responseRoot(request, id);
+ document.map(JsonWriter::toByteArray)
+ .map(SlimeUtils::jsonToSlime)
+ .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields)")));
+ respond(document.isPresent() ? 200 : 404,
+ root,
+ handler);
+ }));
+ return ignoredContent;
+ }
+
+ private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentPut put = parser.parsePut(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
+ executor.put(put,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentUpdate update = parser.parseUpdate(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
+ getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
+ executor.update(update,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
+ executor.remove(id,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ return ignoredContent;
+ }
+
+ private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
+ switch (type) {
+ case BAD_REQUEST:
+ badRequest(request, message, root, handler);
+ break;
+ case NOT_FOUND:
+ notFound(request, message, root, handler);
+ break;
+ case PRECONDITION_FAILED:
+ preconditionFailed(request, message, root, handler);
+ break;
+ case OVERLOAD:
+ overload(request, message, root, handler);
+ break;
+ case TIMEOUT:
+ timeout(request, message, root, handler);
+ break;
+ default:
+ log.log(WARNING, "Unexpected error type '" + type + "'");
+ case ERROR: // intentional fallthrough
+ serverError(request, message, root, handler);
+ }
+ }
+
+ // ------------------------------------------------ Responses ------------------------------------------------
+
+ private static Cursor responseRoot(HttpRequest request) {
+ Cursor root = new Slime().setObject();
+ root.setString("pathId", request.getUri().getRawPath());
+ return root;
+ }
+
+ private static Cursor responseRoot(HttpRequest request, DocumentId id) {
+ Cursor root = responseRoot(request);
+ root.setString("id", id.toString());
+ return root;
+ }
+
+ private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
+ Response response = new Response(Response.Status.NO_CONTENT);
+ response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
+ handler.handleResponse(response).close(logException);
+ return ignoredContent;
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
+ return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.BAD_REQUEST, root, handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
+ return notFound(request,
+ "Nothing at '" + request.getUri().getRawPath() + "'. " +
+ "Available paths are:\n" + String.join("\n", paths),
+ responseRoot(request),
+ handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.NOT_FOUND, root, handler);
+ }
+
+ private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ root.setString("message",
+ "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
+ "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
+ return respond(Response.Status.METHOD_NOT_ALLOWED,
+ root,
+ handler);
+ }
+
+ private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.PRECONDITION_FAILED, root, handler);
+ }
+
+ private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
+ Cursor root = responseRoot(request);
+ root.setString("message", Exceptions.toMessageString(e));
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
+ }
+
+ private static ContentChannel respond(Inspector root, ResponseHandler handler) {
+ return respond(200, root, handler);
+ }
+
+ private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
+ Response response = new Response(status);
+ response.headers().put("Content-Type", "application/json; charset=UTF-8");
+ ContentChannel out = null;
+ try {
+ out = handler.handleResponse(new Response(status));
+ out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ finally {
+ if (out != null) try {
+ out.close(logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ }
+ return ignoredContent;
+ }
+
+ // ------------------------------------------------ Helpers ------------------------------------------------
+
+ private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
+ VisitorOptions.Builder options = VisitorOptions.builder();
+
+ getProperty(request, SELECTION).ifPresent(options::selection);
+ getProperty(request, CONTINUATION).ifPresent(options::continuation);
+ getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
+ getProperty(request, CLUSTER).ifPresent(options::cluster);
+ getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
+ getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
+ .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
+ getProperty(request, CONCURRENCY, numberParser)
+ .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+
+ return options;
+ }
+
+ static class DocumentPath {
+
+ private final Path path;
+ private final Optional<Group> group;
+
+ DocumentPath(Path path) {
+ this.path = requireNonNull(path);
+ this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
+ .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ }
+
+ DocumentId id() {
+ return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
+ ":" + requireNonNull(path.get("documentType")) +
+ ":" + group.map(Group::docIdPart).orElse("") +
+ ":" + requireNonNull(path.get("docid")));
+ }
+
+ String documentType() { return requireNonNull(path.get("documentType")); }
+ String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<Group> group() { return group; }
+
+ }
+
+ private static Optional<String> getProperty(HttpRequest request, String name) {
+ List<String> values = request.parameters().get(name);
+ if (values != null && values.size() != 0)
+ return Optional.ofNullable(values.get(values.size() - 1));
+
+ return Optional.empty();
+ }
+
+ private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
+ return getProperty(request, name).map(parser::parse);
+ }
+
+
+ @FunctionalInterface
+ interface Parser<T> extends Function<String, T> {
+ default T parse(String value) {
+ try {
+ return apply(value);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
+
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ }
+
+
+ /** Readable content channel which forwards data to a reader when closed. */
+ static class ForwardingContentChannel implements ContentChannel {
+
+ private final ReadableContentChannel delegate = new ReadableContentChannel();
+ private final Consumer<InputStream> reader;
+
+ public ForwardingContentChannel(Consumer<InputStream> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ delegate.write(buf, handler);
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ delegate.close(handler);
+ try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
+ reader.accept(in);
+ }
+ }
+
+ }
+
+
+ private static class DocumentOperationParser {
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private final DocumentTypeManager manager;
+
+ DocumentOperationParser(DocumentmanagerConfig config) {
+ this.manager = new DocumentTypeManager(config);
+ }
+
+ DocumentPut parsePut(InputStream inputStream, String docId) {
+ return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
+ }
+
+ DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
+ return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
+ }
+
+ private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
+ return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
+ }
+
+ }
+
+ private class MeasuringResponseHandler implements ResponseHandler {
+
+ private final ResponseHandler delegate;
+ private final DocumentOperationType type;
+ private final Instant start;
+
+ private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
+ this.delegate = delegate;
+ this.type = type;
+ this.start = start;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ switch (response.getStatus() / 100) {
+ case 2: metrics.reportSuccessful(type, start); break;
+ case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
+ case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
+ }
+ return delegate.handleResponse(response);
+ }
+
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
new file mode 100644
index 00000000000..19f4f50648b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
@@ -0,0 +1,15 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package=com.yahoo.document.restapi
+
+# Delay before a throttled operation is retried.
+resendDelayMillis int default=100
+
+# Time between a document operation is received and a timeout response is sent
+defaultTimeoutSeconds int default=180
+
+# Time after which a visitor session times out
+visitTimeoutSeconds int default=120
+
+# Bound on number of document operations to keep in retry queue — further operations are rejected
+maxThrottled int default=200
+