summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-09-29 08:10:05 +0200
committerGitHub <noreply@github.com>2020-09-29 08:10:05 +0200
commitc6aded1606112a54969f56403085ca90d61dac8f (patch)
treedb29615090e57241998ec0deb1c55a49632c3623
parent09bf1d5f22a7ae98191c94e9be591994b5125557 (diff)
Revert "Jonmv/async doc v1 implementation"
-rw-r--r--container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java54
-rw-r--r--container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java32
-rw-r--r--documentapi/abi-spec.json15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java136
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java7
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
-rw-r--r--vespaclient-container-plugin/pom.xml6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java704
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java603
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def15
11 files changed, 106 insertions, 1538 deletions
diff --git a/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java b/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
deleted file mode 100644
index cfb48339dbe..00000000000
--- a/container-core/src/main/java/com/yahoo/container/core/HandlerMetricContextUtil.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.yahoo.container.core;
-
-import com.yahoo.jdisc.Metric;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.application.BindingMatch;
-import com.yahoo.jdisc.application.UriPattern;
-import com.yahoo.jdisc.handler.ResponseHandler;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Common HTTP request handler metrics code.
- *
- * @author jonmv
- */
-public class HandlerMetricContextUtil {
-
- public static void onHandle(Request request, Metric metric, Class<?> handlerClass) {
- metric.add("handled.requests", 1, contextFor(request, metric, handlerClass));
- }
-
- public static void onHandled(Request request, Metric metric, Class<?> handlerClass) {
- metric.set("handled.latency", request.timeElapsed(TimeUnit.MILLISECONDS), contextFor(request, metric, handlerClass));
- }
-
- public static Metric.Context contextFor(Request request, Metric metric, Class<?> handlerClass) {
- return contextFor(request, Map.of(), metric, handlerClass);
- }
-
- public static Metric.Context contextFor(Request request, Map<String, String> extraDimensions, Metric metric, Class<?> handlerClass) {
- BindingMatch<?> match = request.getBindingMatch();
- if (match == null) return null;
- UriPattern matched = match.matched();
- if (matched == null) return null;
- String name = matched.toString();
- String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
-
- Map<String, String> dimensions = new HashMap<>();
- dimensions.put("handler", name);
- if (endpoint != null) {
- dimensions.put("endpoint", endpoint);
- }
- URI uri = request.getUri();
- dimensions.put("scheme", uri.getScheme());
- dimensions.put("port", Integer.toString(uri.getPort()));
- dimensions.put("handler-name", handlerClass.getName());
- dimensions.putAll(extraDimensions);
- return metric.createContext(dimensions);
- }
-
-}
diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
index f5f8b428535..ab768dba0d2 100644
--- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
+++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java
@@ -6,6 +6,8 @@ import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.ResourceReference;
import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.application.BindingMatch;
+import com.yahoo.jdisc.application.UriPattern;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
import com.yahoo.jdisc.handler.BufferedContentChannel;
import com.yahoo.jdisc.handler.ContentChannel;
@@ -13,9 +15,10 @@ import com.yahoo.jdisc.handler.OverloadException;
import com.yahoo.jdisc.handler.ReadableContentChannel;
import com.yahoo.jdisc.handler.ResponseDispatch;
import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.container.core.HandlerMetricContextUtil;
+import java.net.URI;
import java.time.Duration;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
@@ -76,9 +79,29 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
}
Metric.Context contextFor(Request request, Map<String, String> extraDimensions) {
- return HandlerMetricContextUtil.contextFor(request, extraDimensions, metric, getClass());
+ BindingMatch<?> match = request.getBindingMatch();
+ if (match == null) return null;
+ UriPattern matched = match.matched();
+ if (matched == null) return null;
+ String name = matched.toString();
+ String endpoint = request.headers().containsKey("Host") ? request.headers().get("Host").get(0) : null;
+
+ Map<String, String> dimensions = new HashMap<>();
+ dimensions.put("handler", name);
+ if (endpoint != null) {
+ dimensions.put("endpoint", endpoint);
+ }
+ URI uri = request.getUri();
+ dimensions.put("scheme", uri.getScheme());
+ dimensions.put("port", Integer.toString(uri.getPort()));
+ String handlerClassName = getClass().getName();
+ dimensions.put("handler-name", handlerClassName);
+ dimensions.putAll(extraDimensions);
+ return this.metric.createContext(dimensions);
}
+ private Metric.Context contextFor(Request request) { return contextFor(request, Map.of()); }
+
/**
* Handles a request by assigning a worker thread to it.
*
@@ -86,7 +109,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
*/
@Override
public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) {
- HandlerMetricContextUtil.onHandle(request, metric, getClass());
+ metric.add("handled.requests", 1, contextFor(request));
if (request.getTimeout(TimeUnit.SECONDS) == null) {
Duration timeout = getTimeout();
if (timeout != null) {
@@ -189,7 +212,8 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler {
public ContentChannel handleResponse(Response response) {
if ( tryHasResponded()) throw new IllegalStateException("Response already handled");
ContentChannel cc = responseHandler.handleResponse(response);
- HandlerMetricContextUtil.onHandled(request, metric, getClass());
+ long millis = request.timeElapsed(TimeUnit.MILLISECONDS);
+ metric.set("handled.latency", millis, contextFor(request));
return cc;
}
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index c8cbc978a8f..f5f2a7c1845 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -979,9 +979,7 @@
"public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
- "public void destroy()",
- "public void setPhaser(java.util.concurrent.Phaser)",
- "public void setResultType(com.yahoo.documentapi.Result$ResultType)"
+ "public void destroy()"
],
"fields": []
},
@@ -993,15 +991,12 @@
],
"methods": [
"public void <init>(com.yahoo.documentapi.DocumentAccessParams)",
- "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
- "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
+ "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
"public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)",
"public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
- "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)"
+ "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)"
],
"fields": []
},
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index ff3eeb02a71..40f26a82a89 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -20,36 +20,29 @@ import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
/**
* @author bratseth
- * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
+ private final List<Response> responses = new LinkedList<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private final Executor executor = Executors.newCachedThreadPool();
+ private long requestId = 0;
+ private Random random = new Random();
- private AtomicLong requestId = new AtomicLong(0);
- private AtomicReference<Phaser> phaser = new AtomicReference<>();
- private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
+ private synchronized long getNextRequestId() {
+ requestId++;
+ return requestId;
+ }
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
+ random.setSeed(System.currentTimeMillis());
syncSession = access.createSyncSession(new SyncParameters.Builder().build());
}
@@ -65,15 +58,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- return send(req -> {
- try {
- syncSession.put(documentPut, pri);
- return new DocumentResponse(req, documentPut.getDocument());
- }
- catch (Exception e) {
- return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ long req = getNextRequestId();
+ try {
+ syncSession.put(documentPut, pri);
+ addResponse(new DocumentResponse(req, documentPut.getDocument()));
+ } catch (Exception e) {
+ addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
+ }
+ return new Result(req);
}
@Override
@@ -89,14 +81,13 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- return send(req -> {
- try {
- return new DocumentResponse(req, syncSession.get(id));
- }
- catch (Exception e) {
- return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ long req = getNextRequestId();
+ try {
+ addResponse(new DocumentResponse(req, syncSession.get(id)));
+ } catch (Exception e) {
+ addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
+ }
+ return new Result(req);
}
@Override
@@ -106,14 +97,13 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- return send(req -> {
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- return new RemoveResponse(req, true);
- }
- else {
- return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ long req = getNextRequestId();
+ if (syncSession.remove(new DocumentRemove(id), pri)) {
+ addResponse(new RemoveResponse(req, true));
+ } else {
+ addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
+ }
+ return new Result(req);
}
@Override
@@ -123,24 +113,27 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- return send(req -> {
- if (syncSession.update(update, pri)) {
- return new UpdateResponse(req, true);
- }
- else {
- return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ long req = getNextRequestId();
+ if (syncSession.update(update, pri)) {
+ addResponse(new UpdateResponse(req, true));
+ } else {
+ addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
+ }
+ return new Result(req);
}
@Override
public Response getNext() {
- return responses.poll();
+ if (responses.isEmpty()) {
+ return null;
+ }
+ int index = random.nextInt(responses.size());
+ return responses.remove(index);
}
@Override
- public Response getNext(int timeoutMilliseconds) throws InterruptedException {
- return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
+ public Response getNext(int timeout) {
+ return getNext();
}
@Override
@@ -148,22 +141,6 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
- /**
- * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
- * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
- * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
- *
- * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
- */
- public void setPhaser(Phaser phaser) {
- this.phaser.set(phaser);
- }
-
- /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
- public void setResultType(Result.ResultType resultType) {
- this.result.set(resultType);
- }
-
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -172,23 +149,4 @@ public class LocalAsyncSession implements AsyncSession {
}
}
- private Result send(Function<Long, Response> responses) {
- Result.ResultType resultType = result.get();
- if (resultType != SUCCESS)
- return new Result(resultType, new Error());
-
- long req = requestId.incrementAndGet();
- Phaser synchronizer = phaser.get();
- if (synchronizer == null)
- addResponse(responses.apply(req));
- else
- executor.execute(() -> {
- synchronizer.register();
- synchronizer.arriveAndAwaitAdvance();
- addResponse(responses.apply(req));
- synchronizer.arriveAndDeregister();
- });
- return new Result(req);
- }
-
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index e24853b9294..c69a8fb48de 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public LocalSyncSession createSyncSession(SyncParameters parameters) {
+ public SyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
+ public AsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 85be1c11fcd..f087b646ca4 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -76,13 +76,8 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- try {
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
- return;
- }
- catch (RuntimeException e) {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
return;
- }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 33cae60ab93..69dc7c6da74 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,12 +36,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -91,22 +85,17 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
- LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() {
+ AsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
-
- // Let all async operations wait for a signal from the test thread before sending their responses, and let test
- // thread wait for all responses to be delivered afterwards.
- Phaser phaser = new Phaser(1);
- session.setPhaser(phaser);
+ int timeout = 100;
long startTime = System.currentTimeMillis();
- int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -115,38 +104,27 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- // Wait for responses in separate thread.
- Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
- try {
- List<Document> documents = new ArrayList<>();
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeoutMillis - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse) response).getDocument());
- outstandingRequests.remove(response.getRequestId());
- }
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
- }
- catch (InterruptedException e) {
- throw new IllegalArgumentException("Interrupted while waiting for responses");
+ List<Document> documents = new ArrayList<>();
+ try {
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeout - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse)response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
}
- });
-
- // All operations, and receiver, now waiting for this thread to arrive.
- assertEquals(4, phaser.getRegisteredParties());
- assertEquals(0, phaser.getPhase());
- phaser.arrive();
- assertEquals(1, phaser.getPhase());
- phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for documents", e);
+ }
- futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
@Test
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index 8254c208588..9c4b81da806 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -72,12 +72,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>testutil</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
<plugins>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
deleted file mode 100644
index b675af3b564..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ /dev/null
@@ -1,704 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.FixedBucketSpaces;
-import com.yahoo.document.fieldset.AllFields;
-import com.yahoo.document.select.parser.ParseException;
-import com.yahoo.documentapi.AsyncParameters;
-import com.yahoo.documentapi.AsyncSession;
-import com.yahoo.documentapi.DocumentAccess;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
-import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.VisitorControlHandler;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.StaticThrottlePolicy;
-import com.yahoo.text.Text;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
-import java.util.stream.Stream;
-
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toUnmodifiableMap;
-
-/**
- * Encapsulates a document access and supports running asynchronous document
- * operations and visits against this, with retries and optional timeouts.
- *
- * @author jonmv
- */
-public class DocumentOperationExecutor {
-
- private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName());
-
- private final Duration visitTimeout;
- private final long maxThrottled;
- private final DocumentAccess access;
- private final AsyncSession asyncSession;
- private final Map<String, StorageCluster> clusters;
- private final Clock clock;
- private final DelayQueue throttled;
- private final DelayQueue timeouts;
- private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
- private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
-
- public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
- DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
- this(Duration.ofMillis(executorConfig.resendDelayMillis()),
- Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
- Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
- executorConfig.maxThrottled(),
- access,
- parseClusters(clustersConfig, bucketsConfig),
- clock);
- }
-
- DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
- DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
- this.visitTimeout = requireNonNull(visitTimeout);
- this.maxThrottled = maxThrottled;
- this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
- this.clock = requireNonNull(clock);
- this.clusters = Map.copyOf(clusters);
- this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
- this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
- }
-
- /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
- public void shutdown() {
- long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
- Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
- context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
- Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
- context -> context.error(TIMEOUT, "Timed out due to shutdown"));
- visits.values().forEach(VisitorSession::destroy);
- try {
- throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- }
- catch (Exception e) {
- log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
- }
- }
-
- public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.get(id, parameters), context);
- }
-
- public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.put(put, parameters), context);
- }
-
- public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.update(update, parameters), context);
- }
-
- public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.remove(id, parameters), context);
- }
-
- public void visit(VisitorOptions options, VisitOperationsContext context) {
- try {
- AtomicBoolean done = new AtomicBoolean(false);
- VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
- @Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets())
- context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
- case SUCCESS: // intentional fallthrough
- case ABORTED:
- context.success(Optional.ofNullable(getProgress())
- .filter(progress -> ! progress.isFinished())
- .map(ProgressToken::serializeToString));
- break;
- default:
- context.error(ERROR, message != null ? message : "Visiting failed");
- }
- done.set(true); // This may be reached before dispatching thread is done putting us in the map.
- visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
- }
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- if (done.get())
- visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
- }
- catch (IllegalArgumentException | ParseException e) {
- context.error(BAD_REQUEST, Exceptions.toMessageString(e));
- }
- catch (RuntimeException e) {
- context.error(ERROR, Exceptions.toMessageString(e));
- }
- }
-
- public String routeToCluster(String cluster) {
- return resolveCluster(Optional.of(cluster), clusters).route();
- }
-
-
- public enum ErrorType {
- OVERLOAD,
- NOT_FOUND,
- PRECONDITION_FAILED,
- BAD_REQUEST,
- TIMEOUT,
- ERROR;
- }
-
-
- /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
- public static class VisitOperationsContext extends Context<Optional<String>> {
-
- private final Consumer<Document> onDocument;
-
- public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
- super(onError, onSuccess);
- this.onDocument = onDocument;
- }
-
- void document(Document document) {
- if ( ! handled())
- onDocument.accept(document);
- }
-
- }
-
-
- /** Context for a document operation. */
- public static class OperationContext extends Context<Optional<Document>> {
-
- public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
- super(onError, onSuccess);
- }
-
- }
-
-
- public static class VisitorOptions {
-
- private final Optional<String> cluster;
- private final Optional<String> namespace;
- private final Optional<String> documentType;
- private final Optional<Group> group;
- private final Optional<String> selection;
- private final Optional<String> fieldSet;
- private final Optional<String> continuation;
- private final Optional<String> bucketSpace;
- private final Optional<Integer> wantedDocumentCount;
- private final Optional<Integer> concurrency;
-
- private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
- Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
- Optional<String> continuation,Optional<String> bucketSpace,
- Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
- this.cluster = cluster;
- this.namespace = namespace;
- this.documentType = documentType;
- this.group = group;
- this.selection = selection;
- this.fieldSet = fieldSet;
- this.continuation = continuation;
- this.bucketSpace = bucketSpace;
- this.wantedDocumentCount = wantedDocumentCount;
- this.concurrency = concurrency;
- }
-
- private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (cluster.isEmpty() && documentType.isEmpty())
- throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
-
- VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
- documentType,
- namespace.map(value -> "id.namespace=='" + value + "'"),
- group.map(Group::selection))
- .flatMap(Optional::stream)
- .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
- StringJoiner::add,
- StringJoiner::merge)
- .toString());
-
- continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
- wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
- parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10));
- concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1)));
- parameters.setTimeoutMs(visitTimeout.toMillis());
- parameters.visitInconsistentBuckets(true);
- parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
-
- StorageCluster storageCluster = resolveCluster(cluster, clusters);
- parameters.setRoute(storageCluster.route());
- parameters.setBucketSpace(resolveBucket(storageCluster,
- documentType,
- List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
- bucketSpace));
-
- return parameters;
- }
-
- public static Builder builder() { return new Builder(); }
-
-
- public static class Builder {
-
- private String cluster;
- private String documentType;
- private String namespace;
- private Group group;
- private String selection;
- private String fieldSet;
- private String continuation;
- private String bucketSpace;
- private Integer wantedDocumentCount;
- private Integer concurrency;
-
- public Builder cluster(String cluster) {
- this.cluster = cluster;
- return this;
- }
-
- public Builder documentType(String documentType) {
- this.documentType = documentType;
- return this;
- }
-
- public Builder namespace(String namespace) {
- this.namespace = namespace;
- return this;
- }
-
- public Builder group(Group group) {
- this.group = group;
- return this;
- }
-
- public Builder selection(String selection) {
- this.selection = selection;
- return this;
- }
-
- public Builder fieldSet(String fieldSet) {
- this.fieldSet = fieldSet;
- return this;
- }
-
- public Builder continuation(String continuation) {
- this.continuation = continuation;
- return this;
- }
-
- public Builder bucketSpace(String bucketSpace) {
- this.bucketSpace = bucketSpace;
- return this;
- }
-
- public Builder wantedDocumentCount(Integer wantedDocumentCount) {
- this.wantedDocumentCount = wantedDocumentCount;
- return this;
- }
-
- public Builder concurrency(Integer concurrency) {
- this.concurrency = concurrency;
- return this;
- }
-
- public VisitorOptions build() {
- return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
- Optional.ofNullable(namespace), Optional.ofNullable(group),
- Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
- Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
- Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
- }
-
- }
-
- }
-
-
- public static class Group {
-
- private final String value;
- private final String docIdPart;
- private final String selection;
-
- private Group(String value, String docIdPart, String selection) {
- Text.validateTextString(value)
- .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
- this.value = value;
- this.docIdPart = docIdPart;
- this.selection = selection;
- }
-
- public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
- public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
-
- public String value() { return value; }
- public String docIdPart() { return docIdPart; }
- public String selection() { return selection; }
-
- }
-
-
- /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
- private void accept(Supplier<Result> operation, OperationContext context) {
- timeouts.add(operation, context);
- send(operation, context);
- }
-
- /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
- private void send(Supplier<Result> operation, OperationContext context) {
- Result result = operation.get();
- switch (result.type()) {
- case SUCCESS:
- outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
- break;
- case TRANSIENT_ERROR:
- if ( ! throttled.add(operation, context))
- context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
- break;
- default:
- log.log(WARNING, "Unknown result type '" + result.type() + "'");
- case FATAL_ERROR: // intentional fallthrough
- context.error(ERROR, result.getError().getMessage());
- }
- }
-
- private void handle(Response response) {
- outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
- }
-
- private static ErrorType toErrorType(Response.Outcome outcome) {
- switch (outcome) {
- case NOT_FOUND:
- return NOT_FOUND;
- case CONDITION_FAILED:
- return PRECONDITION_FAILED;
- default:
- log.log(WARNING, "Unexpected response outcome: " + outcome);
- case ERROR: // intentional fallthrough
- return ERROR;
- }
- }
-
-
- /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
- private static class Context<T> {
-
- private final AtomicBoolean handled = new AtomicBoolean();
- private final BiConsumer<ErrorType, String> onError;
- private final Consumer<T> onSuccess;
-
- Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
- this.onError = onError;
- this.onSuccess = onSuccess;
- }
-
- void error(ErrorType type, String message) {
- if ( ! handled.getAndSet(true))
- onError.accept(type, message);
- }
-
- void success(T result) {
- if ( ! handled.getAndSet(true))
- onSuccess.accept(result);
- }
-
- boolean handled() {
- return handled.get();
- }
-
- }
-
-
- private static class Completion {
-
- private final OperationContext context;
- private final Response response;
-
- private Completion(OperationContext context, Response response) {
- this.context = context;
- this.response = response;
- }
-
- static Completion of(OperationContext context) {
- return new Completion(requireNonNull(context), null);
- }
-
- static Completion of(Response response) {
- return new Completion(null, requireNonNull(response));
- }
-
- Completion merge(Completion other) {
- if (context == null)
- complete(other.context, response);
- else
- complete(context, other.response);
- return null;
- }
-
- private void complete(OperationContext context, Response response) {
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- }
-
- }
-
-
- /**
- * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
- *
- * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
- * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
- */
- static class DelayQueue {
-
- private final long maxSize;
- private final Clock clock;
- private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
- private final AtomicLong size = new AtomicLong(0);
- private final Thread maintainer;
- private final Duration delay;
- private final long defaultWaitMillis;
-
- public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
- if (maxSize < 0)
- throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
- if (delay.isNegative())
- throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
-
- this.maxSize = maxSize;
- this.delay = delay;
- this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
- this.clock = requireNonNull(clock);
- this.maintainer = new Thread(() -> maintain(action));
- this.maintainer.start();
- }
-
- boolean add(Supplier<Result> operation, OperationContext context) {
- if (size.incrementAndGet() > maxSize) {
- size.decrementAndGet();
- return false;
- }
- return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
- }
-
- long size() { return size.get(); }
-
- Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
- ExecutorService shutdownService = Executors.newSingleThreadExecutor();
- Future<?> future = shutdownService.submit(() -> {
- try {
- Thread.sleep(grace.toMillis());
- }
- finally {
- maintainer.interrupt();
- for (Delayed delayed; (delayed = queue.poll()) != null; ) {
- size.decrementAndGet();
- onShutdown.accept(delayed.context());
- }
- }
- return null;
- });
- shutdownService.shutdown();
- return future;
- }
-
- /**
- * Repeatedly loops through the queue, evicting already handled entries and processing those
- * which have become ready since last time, then waits until new items are guaranteed to be ready,
- * or until it's time for a new run just to ensure GC of handled entries.
- * The entries are assumed to always be added to the back of the queue, with the same delay.
- * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
- * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
- */
- private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
- while ( ! Thread.currentThread().isInterrupted()) {
- try {
- Instant waitUntil = null;
- Iterator<Delayed> operations = queue.iterator();
- while (operations.hasNext()) {
- Delayed delayed = operations.next();
- // Already handled: remove and continue.
- if (delayed.context().handled()) {
- operations.remove();
- size.decrementAndGet();
- continue;
- }
- // Ready for action: remove from queue and run.
- if (delayed.readyAt().isBefore(clock.instant())) {
- operations.remove();
- size.decrementAndGet();
- action.accept(delayed.operation(), delayed.context());
- continue;
- }
- // Not yet ready for action: keep time to wake up again.
- waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
- }
- long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
- synchronized (this) {
- do {
- notify();
- wait(Math.max(0, waitUntilMillis - clock.millis()));
- }
- while (clock.millis() < waitUntilMillis);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (Exception e) {
- log.log(SEVERE, "Exception caught by delay queue maintainer", e);
- }
- }
- }
- }
-
-
- private static class Delayed {
-
- private final Supplier<Result> operation;
- private final OperationContext context;
- private final Instant readyAt;
-
- Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
- this.readyAt = requireNonNull(readyAt);
- this.context = requireNonNull(context);
- this.operation = requireNonNull(operation);
- }
-
- Supplier<Result> operation() { return operation; }
- OperationContext context() { return context; }
- Instant readyAt() { return readyAt; }
-
- }
-
-
- static class StorageCluster {
-
- private final String name;
- private final String configId;
- private final Map<String, String> documentBuckets;
-
- StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
- this.name = requireNonNull(name);
- this.configId = requireNonNull(configId);
- this.documentBuckets = Map.copyOf(documentBuckets);
- }
-
- String name() { return name; }
- String configId() { return configId; }
- String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
- Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
-
- }
-
-
- private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
- if (clusters.isEmpty())
- throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled.");
-
- return wanted.map(cluster -> {
- if ( ! clusters.containsKey(cluster))
- throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.get(cluster);
- }).orElseGet(() -> {
- if (clusters.size() > 1)
- throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.values().iterator().next();
- });
- }
-
- private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
- List<String> bucketSpaces, Optional<String> bucketSpace) {
- return documentType.map(type -> cluster.bucketOf(type)
- .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
- "' is not mapped to a known bucket space")))
- .or(() -> bucketSpace.map(space -> {
- if ( ! bucketSpaces.contains(space))
- throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
- String.join(", ", bucketSpaces));
- return space;
- }))
- .orElse(FixedBucketSpaces.defaultSpace());
- }
-
-
-
- private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
- return clusters.storage().stream()
- .collect(toUnmodifiableMap(storage -> storage.name(),
- storage -> new StorageCluster(storage.name(),
- storage.configid(),
- buckets.cluster(storage.name())
- .documentType().entrySet().stream()
- .collect(toMap(entry -> entry.getKey(),
- entry -> entry.getValue().bucketSpace())))));
- }
-
-
- // Visible for testing.
- AsyncSession asyncSession() { return asyncSession; }
- Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
- void notifyMaintainers() throws InterruptedException {
- synchronized (throttled) { throttled.notify(); throttled.wait(); }
- synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
deleted file mode 100644
index 96ea6c08f86..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ /dev/null
@@ -1,603 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi.resource;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.google.inject.Inject;
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.container.core.documentapi.VespaDocumentAccess;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentOperation;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.TestAndSetCondition;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.document.json.JsonReader;
-import com.yahoo.document.json.JsonWriter;
-import com.yahoo.document.json.document.DocumentParser;
-import com.yahoo.document.restapi.DocumentOperationExecutor;
-import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
-import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
-import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
-import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.metrics.DocumentApiMetrics;
-import com.yahoo.documentapi.metrics.DocumentOperationStatus;
-import com.yahoo.documentapi.metrics.DocumentOperationType;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.handler.AbstractRequestHandler;
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.ReadableContentChannel;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.handler.UnsafeContentInputStream;
-import com.yahoo.container.core.HandlerMetricContextUtil;
-import com.yahoo.jdisc.http.HttpRequest;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.restapi.Path;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Inspector;
-import com.yahoo.slime.Slime;
-import com.yahoo.slime.SlimeUtils;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.logging.Logger;
-
-import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
-import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
-import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
-import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
-import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
-import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.joining;
-
-/**
- * Asynchronous HTTP handler for /document/v1/
- *
- * @author jonmv
- */
-public class DocumentV1ApiHandler extends AbstractRequestHandler {
-
- private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
- private static final Parser<Integer> numberParser = Integer::parseInt;
- private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
-
- private static final CompletionHandler logException = new CompletionHandler() {
- @Override public void completed() { }
- @Override public void failed(Throwable t) {
- log.log(WARNING, "Exception writing response data", t);
- }
- };
-
- private static final ContentChannel ignoredContent = new ContentChannel() {
- @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
- @Override public void close(CompletionHandler handler) { handler.completed(); }
- };
-
- private static final String CREATE = "create";
- private static final String CONDITION = "condition";
- private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
- private static final String FIELD_SET = "fieldSet";
- private static final String SELECTION = "selection";
- private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
- private static final String CONTINUATION = "continuation";
- private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
- private static final String CONCURRENCY = "concurrency";
- private static final String BUCKET_SPACE = "bucketSpace";
-
- private final Clock clock;
- private final Metric metric; // TODO jonmv: make response class which logs on completion/error
- private final DocumentApiMetrics metrics;
- private final DocumentOperationExecutor executor;
- private final DocumentOperationParser parser;
- private final Map<String, Map<Method, Handler>> handlers;
-
- @Inject
- public DocumentV1ApiHandler(Clock clock,
- Metric metric,
- MetricReceiver metricReceiver,
- VespaDocumentAccess documentAccess,
- DocumentmanagerConfig documentManagerConfig,
- ClusterListConfig clusterListConfig,
- AllClustersBucketSpacesConfig bucketSpacesConfig,
- DocumentOperationExecutorConfig executorConfig) {
- this(clock,
- new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
- new DocumentOperationParser(documentManagerConfig),
- metric,
- metricReceiver);
- }
-
- DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
- Metric metric, MetricReceiver metricReceiver) {
- this.clock = clock;
- this.executor = executor;
- this.parser = parser;
- this.metric = metric;
- this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
- this.handlers = defineApi();
- }
-
- @Override
- public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
- HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
- ResponseHandler responseHandler = response -> {
- HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
- return rawResponseHandler.handleResponse(response);
- };
-
- HttpRequest request = (HttpRequest) rawRequest;
- try {
- Path requestPath = new Path(request.getUri());
- for (String path : handlers.keySet())
- if (requestPath.matches(path)) {
- Map<Method, Handler> methods = handlers.get(path);
- if (methods.containsKey(request.getMethod()))
- return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
-
- if (request.getMethod() == OPTIONS)
- return options(methods.keySet(), responseHandler);
-
- return methodNotAllowed(request, methods.keySet(), responseHandler);
- }
- return notFound(request, handlers.keySet(), responseHandler);
- }
- catch (IllegalArgumentException e) {
- return badRequest(request, e, responseHandler);
- }
- catch (RuntimeException e) {
- return serverError(request, e, responseHandler);
- }
- }
-
- @Override
- public void destroy() {
- this.executor.shutdown();
- }
-
- private Map<String, Map<Method, Handler>> defineApi() {
- Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
-
- handlers.put("/document/v1/",
- Map.of(GET, this::getRoot));
-
- handlers.put("/document/v1/{namespace}/{documentType}/docid/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- return Collections.unmodifiableMap(handlers);
- }
-
- private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- Cursor documents = root.setArray("documents");
- executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
- return ignoredContent;
- }
-
- private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- VisitorOptions.Builder options = parseOptions(request, path);
- options = options.documentType(path.documentType());
- options = options.namespace(path.namespace());
- options = path.group().map(options::group).orElse(options);
- executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
- return ignoredContent;
- }
-
- private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
- Object monitor = new Object();
- return new VisitOperationsContext((type, message) -> {
- synchronized (monitor) {
- handleError(request, type, message, root, handler);
- }
- },
- token -> {
- token.ifPresent(value -> root.setString("continuation", value));
- synchronized (monitor) {
- respond(root, handler);
- }
- },
- // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
- document -> {
- synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
- SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
- documents.addObject());
- }
- });
- }
- private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- DocumentId id = path.id();
- DocumentOperationParameters parameters = parameters();
- parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
- parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
- executor.get(id,
- parameters,
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- document -> {
- Cursor root = responseRoot(request, id);
- document.map(JsonWriter::toByteArray)
- .map(SlimeUtils::jsonToSlime)
- .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields)")));
- respond(document.isPresent() ? 200 : 404,
- root,
- handler);
- }));
- return ignoredContent;
- }
-
- private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
- return new ForwardingContentChannel(in -> {
- try {
- DocumentPut put = parser.parsePut(in, id.toString());
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
- executor.put(put,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
- }
- });
- }
-
- private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
- return new ForwardingContentChannel(in -> {
- try {
- DocumentUpdate update = parser.parseUpdate(in, id.toString());
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
- getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
- executor.update(update,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
- }
- });
- }
-
- private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
- executor.remove(id,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- return ignoredContent;
- }
-
- private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
- switch (type) {
- case BAD_REQUEST:
- badRequest(request, message, root, handler);
- break;
- case NOT_FOUND:
- notFound(request, message, root, handler);
- break;
- case PRECONDITION_FAILED:
- preconditionFailed(request, message, root, handler);
- break;
- case OVERLOAD:
- overload(request, message, root, handler);
- break;
- case TIMEOUT:
- timeout(request, message, root, handler);
- break;
- default:
- log.log(WARNING, "Unexpected error type '" + type + "'");
- case ERROR: // intentional fallthrough
- serverError(request, message, root, handler);
- }
- }
-
- // ------------------------------------------------ Responses ------------------------------------------------
-
- private static Cursor responseRoot(HttpRequest request) {
- Cursor root = new Slime().setObject();
- root.setString("pathId", request.getUri().getRawPath());
- return root;
- }
-
- private static Cursor responseRoot(HttpRequest request, DocumentId id) {
- Cursor root = responseRoot(request);
- root.setString("id", id.toString());
- return root;
- }
-
- private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
- Response response = new Response(Response.Status.NO_CONTENT);
- response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
- handler.handleResponse(response).close(logException);
- return ignoredContent;
- }
-
- private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
- return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
- }
-
- private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.BAD_REQUEST, root, handler);
- }
-
- private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
- return notFound(request,
- "Nothing at '" + request.getUri().getRawPath() + "'. " +
- "Available paths are:\n" + String.join("\n", paths),
- responseRoot(request),
- handler);
- }
-
- private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.NOT_FOUND, root, handler);
- }
-
- private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- root.setString("message",
- "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
- "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
- return respond(Response.Status.METHOD_NOT_ALLOWED,
- root,
- handler);
- }
-
- private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.PRECONDITION_FAILED, root, handler);
- }
-
- private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
- }
-
- private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
- Cursor root = responseRoot(request);
- root.setString("message", Exceptions.toMessageString(e));
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
-
- private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
-
- private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
- }
-
- private static ContentChannel respond(Inspector root, ResponseHandler handler) {
- return respond(200, root, handler);
- }
-
- private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
- Response response = new Response(status);
- response.headers().put("Content-Type", "application/json; charset=UTF-8");
- ContentChannel out = null;
- try {
- out = handler.handleResponse(new Response(status));
- out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
- }
- catch (Exception e) {
- log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
- }
- finally {
- if (out != null) try {
- out.close(logException);
- }
- catch (Exception e) {
- log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
- }
- }
- return ignoredContent;
- }
-
- // ------------------------------------------------ Helpers ------------------------------------------------
-
- private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
- VisitorOptions.Builder options = VisitorOptions.builder();
-
- getProperty(request, SELECTION).ifPresent(options::selection);
- getProperty(request, CONTINUATION).ifPresent(options::continuation);
- getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
- getProperty(request, CLUSTER).ifPresent(options::cluster);
- getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
- getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
- .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
- getProperty(request, CONCURRENCY, numberParser)
- .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
-
- return options;
- }
-
- static class DocumentPath {
-
- private final Path path;
- private final Optional<Group> group;
-
- DocumentPath(Path path) {
- this.path = requireNonNull(path);
- this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
- .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
- }
-
- DocumentId id() {
- return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
- ":" + requireNonNull(path.get("documentType")) +
- ":" + group.map(Group::docIdPart).orElse("") +
- ":" + requireNonNull(path.get("docid")));
- }
-
- String documentType() { return requireNonNull(path.get("documentType")); }
- String namespace() { return requireNonNull(path.get("namespace")); }
- Optional<Group> group() { return group; }
-
- }
-
- private static Optional<String> getProperty(HttpRequest request, String name) {
- List<String> values = request.parameters().get(name);
- if (values != null && values.size() != 0)
- return Optional.ofNullable(values.get(values.size() - 1));
-
- return Optional.empty();
- }
-
- private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
- return getProperty(request, name).map(parser::parse);
- }
-
-
- @FunctionalInterface
- interface Parser<T> extends Function<String, T> {
- default T parse(String value) {
- try {
- return apply(value);
- }
- catch (RuntimeException e) {
- throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
- }
- }
- }
-
-
- @FunctionalInterface
- interface Handler {
- ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
- }
-
-
- /** Readable content channel which forwards data to a reader when closed. */
- static class ForwardingContentChannel implements ContentChannel {
-
- private final ReadableContentChannel delegate = new ReadableContentChannel();
- private final Consumer<InputStream> reader;
-
- public ForwardingContentChannel(Consumer<InputStream> reader) {
- this.reader = reader;
- }
-
- @Override
- public void write(ByteBuffer buf, CompletionHandler handler) {
- delegate.write(buf, handler);
- }
-
- @Override
- public void close(CompletionHandler handler) {
- delegate.close(handler);
- try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
- reader.accept(in);
- }
- }
-
- }
-
-
- private static class DocumentOperationParser {
-
- private static final JsonFactory jsonFactory = new JsonFactory();
-
- private final DocumentTypeManager manager;
-
- DocumentOperationParser(DocumentmanagerConfig config) {
- this.manager = new DocumentTypeManager(config);
- }
-
- DocumentPut parsePut(InputStream inputStream, String docId) {
- return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
- }
-
- DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
- return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
- }
-
- private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
- return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
- }
-
- }
-
- private class MeasuringResponseHandler implements ResponseHandler {
-
- private final ResponseHandler delegate;
- private final DocumentOperationType type;
- private final Instant start;
-
- private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
- this.delegate = delegate;
- this.type = type;
- this.start = start;
- }
-
- @Override
- public ContentChannel handleResponse(Response response) {
- switch (response.getStatus() / 100) {
- case 2: metrics.reportSuccessful(type, start); break;
- case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
- case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
- }
- return delegate.handleResponse(response);
- }
-
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
deleted file mode 100644
index 19f4f50648b..00000000000
--- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package=com.yahoo.document.restapi
-
-# Delay before a throttled operation is retried.
-resendDelayMillis int default=100
-
-# Time between a document operation is received and a timeout response is sent
-defaultTimeoutSeconds int default=180
-
-# Time after which a visitor session times out
-visitTimeoutSeconds int default=120
-
-# Bound on number of document operations to keep in retry queue — further operations are rejected
-maxThrottled int default=200
-