summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-29 08:39:35 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:34 +0200
commit11405e52f2853e44df0944bf7bbee13dc2e617a5 (patch)
tree572e253df6f8c090072da3b211b6973b75f3ef37 /vespaclient-container-plugin
parentbc54f2ad34e2e4737a4de326035fdf00d5729da1 (diff)
Revert "Revert "Jonmv/async doc v1 implementation""
This reverts commit c6aded1606112a54969f56403085ca90d61dac8f.
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/pom.xml6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java704
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java603
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def15
4 files changed, 1328 insertions, 0 deletions
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index 9c4b81da806..8254c208588 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -72,6 +72,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>testutil</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
new file mode 100644
index 00000000000..b675af3b564
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -0,0 +1,704 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.text.Text;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.SEVERE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Encapsulates a document access and supports running asynchronous document
+ * operations and visits against this, with retries and optional timeouts.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutor {
+
+ private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName());
+
+ private final Duration visitTimeout;
+ private final long maxThrottled;
+ private final DocumentAccess access;
+ private final AsyncSession asyncSession;
+ private final Map<String, StorageCluster> clusters;
+ private final Clock clock;
+ private final DelayQueue throttled;
+ private final DelayQueue timeouts;
+ private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
+ private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+
+ public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
+ DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
+ this(Duration.ofMillis(executorConfig.resendDelayMillis()),
+ Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
+ Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
+ executorConfig.maxThrottled(),
+ access,
+ parseClusters(clustersConfig, bucketsConfig),
+ clock);
+ }
+
+ DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
+ DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
+ this.visitTimeout = requireNonNull(visitTimeout);
+ this.maxThrottled = maxThrottled;
+ this.access = requireNonNull(access);
+ this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
+ this.clock = requireNonNull(clock);
+ this.clusters = Map.copyOf(clusters);
+ this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
+ this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
+ }
+
+ /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
+ public void shutdown() {
+ long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
+ Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
+ context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
+ Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
+ context -> context.error(TIMEOUT, "Timed out due to shutdown"));
+ visits.values().forEach(VisitorSession::destroy);
+ try {
+ throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
+ }
+ }
+
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.get(id, parameters), context);
+ }
+
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.put(put, parameters), context);
+ }
+
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.update(update, parameters), context);
+ }
+
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.remove(id, parameters), context);
+ }
+
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ try {
+ AtomicBoolean done = new AtomicBoolean(false);
+ VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
+ @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
+ });
+ parameters.setControlHandler(new VisitorControlHandler() {
+ @Override public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets())
+ context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
+ case SUCCESS: // intentional fallthrough
+ case ABORTED:
+ context.success(Optional.ofNullable(getProgress())
+ .filter(progress -> ! progress.isFinished())
+ .map(ProgressToken::serializeToString));
+ break;
+ default:
+ context.error(ERROR, message != null ? message : "Visiting failed");
+ }
+ done.set(true); // This may be reached before dispatching thread is done putting us in the map.
+ visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
+ }
+ });
+ visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ if (done.get())
+ visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
+ }
+ catch (IllegalArgumentException | ParseException e) {
+ context.error(BAD_REQUEST, Exceptions.toMessageString(e));
+ }
+ catch (RuntimeException e) {
+ context.error(ERROR, Exceptions.toMessageString(e));
+ }
+ }
+
+ public String routeToCluster(String cluster) {
+ return resolveCluster(Optional.of(cluster), clusters).route();
+ }
+
+
+ public enum ErrorType {
+ OVERLOAD,
+ NOT_FOUND,
+ PRECONDITION_FAILED,
+ BAD_REQUEST,
+ TIMEOUT,
+ ERROR;
+ }
+
+
+ /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
+ public static class VisitOperationsContext extends Context<Optional<String>> {
+
+ private final Consumer<Document> onDocument;
+
+ public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
+ super(onError, onSuccess);
+ this.onDocument = onDocument;
+ }
+
+ void document(Document document) {
+ if ( ! handled())
+ onDocument.accept(document);
+ }
+
+ }
+
+
+ /** Context for a document operation. */
+ public static class OperationContext extends Context<Optional<Document>> {
+
+ public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
+ super(onError, onSuccess);
+ }
+
+ }
+
+
+ public static class VisitorOptions {
+
+ private final Optional<String> cluster;
+ private final Optional<String> namespace;
+ private final Optional<String> documentType;
+ private final Optional<Group> group;
+ private final Optional<String> selection;
+ private final Optional<String> fieldSet;
+ private final Optional<String> continuation;
+ private final Optional<String> bucketSpace;
+ private final Optional<Integer> wantedDocumentCount;
+ private final Optional<Integer> concurrency;
+
+ private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
+ Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
+ Optional<String> continuation,Optional<String> bucketSpace,
+ Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
+ this.cluster = cluster;
+ this.namespace = namespace;
+ this.documentType = documentType;
+ this.group = group;
+ this.selection = selection;
+ this.fieldSet = fieldSet;
+ this.continuation = continuation;
+ this.bucketSpace = bucketSpace;
+ this.wantedDocumentCount = wantedDocumentCount;
+ this.concurrency = concurrency;
+ }
+
+ private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
+ if (cluster.isEmpty() && documentType.isEmpty())
+ throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+
+ VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
+ documentType,
+ namespace.map(value -> "id.namespace=='" + value + "'"),
+ group.map(Group::selection))
+ .flatMap(Optional::stream)
+ .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
+ StringJoiner::add,
+ StringJoiner::merge)
+ .toString());
+
+ continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
+ parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10));
+ concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1)));
+ parameters.setTimeoutMs(visitTimeout.toMillis());
+ parameters.visitInconsistentBuckets(true);
+ parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
+
+ StorageCluster storageCluster = resolveCluster(cluster, clusters);
+ parameters.setRoute(storageCluster.route());
+ parameters.setBucketSpace(resolveBucket(storageCluster,
+ documentType,
+ List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
+ bucketSpace));
+
+ return parameters;
+ }
+
+ public static Builder builder() { return new Builder(); }
+
+
+ public static class Builder {
+
+ private String cluster;
+ private String documentType;
+ private String namespace;
+ private Group group;
+ private String selection;
+ private String fieldSet;
+ private String continuation;
+ private String bucketSpace;
+ private Integer wantedDocumentCount;
+ private Integer concurrency;
+
+ public Builder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder documentType(String documentType) {
+ this.documentType = documentType;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder group(Group group) {
+ this.group = group;
+ return this;
+ }
+
+ public Builder selection(String selection) {
+ this.selection = selection;
+ return this;
+ }
+
+ public Builder fieldSet(String fieldSet) {
+ this.fieldSet = fieldSet;
+ return this;
+ }
+
+ public Builder continuation(String continuation) {
+ this.continuation = continuation;
+ return this;
+ }
+
+ public Builder bucketSpace(String bucketSpace) {
+ this.bucketSpace = bucketSpace;
+ return this;
+ }
+
+ public Builder wantedDocumentCount(Integer wantedDocumentCount) {
+ this.wantedDocumentCount = wantedDocumentCount;
+ return this;
+ }
+
+ public Builder concurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ return this;
+ }
+
+ public VisitorOptions build() {
+ return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
+ Optional.ofNullable(namespace), Optional.ofNullable(group),
+ Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
+ Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
+ Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
+ }
+
+ }
+
+ }
+
+
+ public static class Group {
+
+ private final String value;
+ private final String docIdPart;
+ private final String selection;
+
+ private Group(String value, String docIdPart, String selection) {
+ Text.validateTextString(value)
+ .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
+ this.value = value;
+ this.docIdPart = docIdPart;
+ this.selection = selection;
+ }
+
+ public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
+
+ public String value() { return value; }
+ public String docIdPart() { return docIdPart; }
+ public String selection() { return selection; }
+
+ }
+
+
+ /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
+ private void accept(Supplier<Result> operation, OperationContext context) {
+ timeouts.add(operation, context);
+ send(operation, context);
+ }
+
+ /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
+ private void send(Supplier<Result> operation, OperationContext context) {
+ Result result = operation.get();
+ switch (result.type()) {
+ case SUCCESS:
+ outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
+ break;
+ case TRANSIENT_ERROR:
+ if ( ! throttled.add(operation, context))
+ context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
+ break;
+ default:
+ log.log(WARNING, "Unknown result type '" + result.type() + "'");
+ case FATAL_ERROR: // intentional fallthrough
+ context.error(ERROR, result.getError().getMessage());
+ }
+ }
+
+ private void handle(Response response) {
+ outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
+ }
+
+ private static ErrorType toErrorType(Response.Outcome outcome) {
+ switch (outcome) {
+ case NOT_FOUND:
+ return NOT_FOUND;
+ case CONDITION_FAILED:
+ return PRECONDITION_FAILED;
+ default:
+ log.log(WARNING, "Unexpected response outcome: " + outcome);
+ case ERROR: // intentional fallthrough
+ return ERROR;
+ }
+ }
+
+
+ /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
+ private static class Context<T> {
+
+ private final AtomicBoolean handled = new AtomicBoolean();
+ private final BiConsumer<ErrorType, String> onError;
+ private final Consumer<T> onSuccess;
+
+ Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
+ this.onError = onError;
+ this.onSuccess = onSuccess;
+ }
+
+ void error(ErrorType type, String message) {
+ if ( ! handled.getAndSet(true))
+ onError.accept(type, message);
+ }
+
+ void success(T result) {
+ if ( ! handled.getAndSet(true))
+ onSuccess.accept(result);
+ }
+
+ boolean handled() {
+ return handled.get();
+ }
+
+ }
+
+
+ private static class Completion {
+
+ private final OperationContext context;
+ private final Response response;
+
+ private Completion(OperationContext context, Response response) {
+ this.context = context;
+ this.response = response;
+ }
+
+ static Completion of(OperationContext context) {
+ return new Completion(requireNonNull(context), null);
+ }
+
+ static Completion of(Response response) {
+ return new Completion(null, requireNonNull(response));
+ }
+
+ Completion merge(Completion other) {
+ if (context == null)
+ complete(other.context, response);
+ else
+ complete(context, other.response);
+ return null;
+ }
+
+ private void complete(OperationContext context, Response response) {
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ }
+
+ }
+
+
+ /**
+ * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
+ *
+ * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
+ * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
+ */
+ static class DelayQueue {
+
+ private final long maxSize;
+ private final Clock clock;
+ private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong size = new AtomicLong(0);
+ private final Thread maintainer;
+ private final Duration delay;
+ private final long defaultWaitMillis;
+
+ public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
+ if (maxSize < 0)
+ throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
+ if (delay.isNegative())
+ throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
+
+ this.maxSize = maxSize;
+ this.delay = delay;
+ this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
+ this.clock = requireNonNull(clock);
+ this.maintainer = new Thread(() -> maintain(action));
+ this.maintainer.start();
+ }
+
+ boolean add(Supplier<Result> operation, OperationContext context) {
+ if (size.incrementAndGet() > maxSize) {
+ size.decrementAndGet();
+ return false;
+ }
+ return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
+ }
+
+ long size() { return size.get(); }
+
+ Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
+ ExecutorService shutdownService = Executors.newSingleThreadExecutor();
+ Future<?> future = shutdownService.submit(() -> {
+ try {
+ Thread.sleep(grace.toMillis());
+ }
+ finally {
+ maintainer.interrupt();
+ for (Delayed delayed; (delayed = queue.poll()) != null; ) {
+ size.decrementAndGet();
+ onShutdown.accept(delayed.context());
+ }
+ }
+ return null;
+ });
+ shutdownService.shutdown();
+ return future;
+ }
+
+ /**
+ * Repeatedly loops through the queue, evicting already handled entries and processing those
+ * which have become ready since last time, then waits until new items are guaranteed to be ready,
+ * or until it's time for a new run just to ensure GC of handled entries.
+ * The entries are assumed to always be added to the back of the queue, with the same delay.
+ * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
+ * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
+ */
+ private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
+ while ( ! Thread.currentThread().isInterrupted()) {
+ try {
+ Instant waitUntil = null;
+ Iterator<Delayed> operations = queue.iterator();
+ while (operations.hasNext()) {
+ Delayed delayed = operations.next();
+ // Already handled: remove and continue.
+ if (delayed.context().handled()) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ // Ready for action: remove from queue and run.
+ if (delayed.readyAt().isBefore(clock.instant())) {
+ operations.remove();
+ size.decrementAndGet();
+ action.accept(delayed.operation(), delayed.context());
+ continue;
+ }
+ // Not yet ready for action: keep time to wake up again.
+ waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
+ }
+ long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
+ synchronized (this) {
+ do {
+ notify();
+ wait(Math.max(0, waitUntilMillis - clock.millis()));
+ }
+ while (clock.millis() < waitUntilMillis);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e) {
+ log.log(SEVERE, "Exception caught by delay queue maintainer", e);
+ }
+ }
+ }
+ }
+
+
+ private static class Delayed {
+
+ private final Supplier<Result> operation;
+ private final OperationContext context;
+ private final Instant readyAt;
+
+ Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
+ this.readyAt = requireNonNull(readyAt);
+ this.context = requireNonNull(context);
+ this.operation = requireNonNull(operation);
+ }
+
+ Supplier<Result> operation() { return operation; }
+ OperationContext context() { return context; }
+ Instant readyAt() { return readyAt; }
+
+ }
+
+
+ static class StorageCluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<String, String> documentBuckets;
+
+ StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() { return name; }
+ String configId() { return configId; }
+ String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
+ Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
+
+ }
+
+
+ private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ if (clusters.isEmpty())
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled.");
+
+ return wanted.map(cluster -> {
+ if ( ! clusters.containsKey(cluster))
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.get(cluster);
+ }).orElseGet(() -> {
+ if (clusters.size() > 1)
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.values().iterator().next();
+ });
+ }
+
+ private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
+ return documentType.map(type -> cluster.bucketOf(type)
+ .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
+ "' is not mapped to a known bucket space")))
+ .or(() -> bucketSpace.map(space -> {
+ if ( ! bucketSpaces.contains(space))
+ throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
+ String.join(", ", bucketSpaces));
+ return space;
+ }))
+ .orElse(FixedBucketSpaces.defaultSpace());
+ }
+
+
+
+ private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
+ return clusters.storage().stream()
+ .collect(toUnmodifiableMap(storage -> storage.name(),
+ storage -> new StorageCluster(storage.name(),
+ storage.configid(),
+ buckets.cluster(storage.name())
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().bucketSpace())))));
+ }
+
+
+ // Visible for testing.
+ AsyncSession asyncSession() { return asyncSession; }
+ Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
+ void notifyMaintainers() throws InterruptedException {
+ synchronized (throttled) { throttled.notify(); throttled.wait(); }
+ synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
new file mode 100644
index 00000000000..96ea6c08f86
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -0,0 +1,603 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi.resource;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.json.JsonReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.json.document.DocumentParser;
+import com.yahoo.document.restapi.DocumentOperationExecutor;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ReadableContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.handler.UnsafeContentInputStream;
+import com.yahoo.container.core.HandlerMetricContextUtil;
+import com.yahoo.jdisc.http.HttpRequest;
+import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.restapi.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
+import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
+import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
+import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Asynchronous HTTP handler for /document/v1/
+ *
+ * @author jonmv
+ */
+public class DocumentV1ApiHandler extends AbstractRequestHandler {
+
+ private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
+ private static final Parser<Integer> numberParser = Integer::parseInt;
+ private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
+
+ private static final CompletionHandler logException = new CompletionHandler() {
+ @Override public void completed() { }
+ @Override public void failed(Throwable t) {
+ log.log(WARNING, "Exception writing response data", t);
+ }
+ };
+
+ private static final ContentChannel ignoredContent = new ContentChannel() {
+ @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
+ @Override public void close(CompletionHandler handler) { handler.completed(); }
+ };
+
+ private static final String CREATE = "create";
+ private static final String CONDITION = "condition";
+ private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
+ private static final String FIELD_SET = "fieldSet";
+ private static final String SELECTION = "selection";
+ private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
+ private static final String CONTINUATION = "continuation";
+ private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
+ private static final String CONCURRENCY = "concurrency";
+ private static final String BUCKET_SPACE = "bucketSpace";
+
+ private final Clock clock;
+ private final Metric metric; // TODO jonmv: make response class which logs on completion/error
+ private final DocumentApiMetrics metrics;
+ private final DocumentOperationExecutor executor;
+ private final DocumentOperationParser parser;
+ private final Map<String, Map<Method, Handler>> handlers;
+
+ @Inject
+ public DocumentV1ApiHandler(Clock clock,
+ Metric metric,
+ MetricReceiver metricReceiver,
+ VespaDocumentAccess documentAccess,
+ DocumentmanagerConfig documentManagerConfig,
+ ClusterListConfig clusterListConfig,
+ AllClustersBucketSpacesConfig bucketSpacesConfig,
+ DocumentOperationExecutorConfig executorConfig) {
+ this(clock,
+ new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
+ new DocumentOperationParser(documentManagerConfig),
+ metric,
+ metricReceiver);
+ }
+
+ DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
+ Metric metric, MetricReceiver metricReceiver) {
+ this.clock = clock;
+ this.executor = executor;
+ this.parser = parser;
+ this.metric = metric;
+ this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.handlers = defineApi();
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
+ HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
+ ResponseHandler responseHandler = response -> {
+ HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
+ return rawResponseHandler.handleResponse(response);
+ };
+
+ HttpRequest request = (HttpRequest) rawRequest;
+ try {
+ Path requestPath = new Path(request.getUri());
+ for (String path : handlers.keySet())
+ if (requestPath.matches(path)) {
+ Map<Method, Handler> methods = handlers.get(path);
+ if (methods.containsKey(request.getMethod()))
+ return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
+
+ if (request.getMethod() == OPTIONS)
+ return options(methods.keySet(), responseHandler);
+
+ return methodNotAllowed(request, methods.keySet(), responseHandler);
+ }
+ return notFound(request, handlers.keySet(), responseHandler);
+ }
+ catch (IllegalArgumentException e) {
+ return badRequest(request, e, responseHandler);
+ }
+ catch (RuntimeException e) {
+ return serverError(request, e, responseHandler);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ this.executor.shutdown();
+ }
+
+ private Map<String, Map<Method, Handler>> defineApi() {
+ Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
+
+ handlers.put("/document/v1/",
+ Map.of(GET, this::getRoot));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ return Collections.unmodifiableMap(handlers);
+ }
+
+ private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ Cursor documents = root.setArray("documents");
+ executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ VisitorOptions.Builder options = parseOptions(request, path);
+ options = options.documentType(path.documentType());
+ options = options.namespace(path.namespace());
+ options = path.group().map(options::group).orElse(options);
+ executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
+ Object monitor = new Object();
+ return new VisitOperationsContext((type, message) -> {
+ synchronized (monitor) {
+ handleError(request, type, message, root, handler);
+ }
+ },
+ token -> {
+ token.ifPresent(value -> root.setString("continuation", value));
+ synchronized (monitor) {
+ respond(root, handler);
+ }
+ },
+ // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
+ document -> {
+ synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
+ SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
+ documents.addObject());
+ }
+ });
+ }
+ private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ DocumentId id = path.id();
+ DocumentOperationParameters parameters = parameters();
+ parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
+ parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
+ executor.get(id,
+ parameters,
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ document -> {
+ Cursor root = responseRoot(request, id);
+ document.map(JsonWriter::toByteArray)
+ .map(SlimeUtils::jsonToSlime)
+ .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields)")));
+ respond(document.isPresent() ? 200 : 404,
+ root,
+ handler);
+ }));
+ return ignoredContent;
+ }
+
+ private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentPut put = parser.parsePut(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
+ executor.put(put,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentUpdate update = parser.parseUpdate(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
+ getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
+ executor.update(update,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
+ executor.remove(id,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ return ignoredContent;
+ }
+
+ private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
+ switch (type) {
+ case BAD_REQUEST:
+ badRequest(request, message, root, handler);
+ break;
+ case NOT_FOUND:
+ notFound(request, message, root, handler);
+ break;
+ case PRECONDITION_FAILED:
+ preconditionFailed(request, message, root, handler);
+ break;
+ case OVERLOAD:
+ overload(request, message, root, handler);
+ break;
+ case TIMEOUT:
+ timeout(request, message, root, handler);
+ break;
+ default:
+ log.log(WARNING, "Unexpected error type '" + type + "'");
+ case ERROR: // intentional fallthrough
+ serverError(request, message, root, handler);
+ }
+ }
+
+ // ------------------------------------------------ Responses ------------------------------------------------
+
+ private static Cursor responseRoot(HttpRequest request) {
+ Cursor root = new Slime().setObject();
+ root.setString("pathId", request.getUri().getRawPath());
+ return root;
+ }
+
+ private static Cursor responseRoot(HttpRequest request, DocumentId id) {
+ Cursor root = responseRoot(request);
+ root.setString("id", id.toString());
+ return root;
+ }
+
+ private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
+ Response response = new Response(Response.Status.NO_CONTENT);
+ response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
+ handler.handleResponse(response).close(logException);
+ return ignoredContent;
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
+ return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.BAD_REQUEST, root, handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
+ return notFound(request,
+ "Nothing at '" + request.getUri().getRawPath() + "'. " +
+ "Available paths are:\n" + String.join("\n", paths),
+ responseRoot(request),
+ handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.NOT_FOUND, root, handler);
+ }
+
+ private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ root.setString("message",
+ "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
+ "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
+ return respond(Response.Status.METHOD_NOT_ALLOWED,
+ root,
+ handler);
+ }
+
+ private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.PRECONDITION_FAILED, root, handler);
+ }
+
+ private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
+ Cursor root = responseRoot(request);
+ root.setString("message", Exceptions.toMessageString(e));
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
+ }
+
+ private static ContentChannel respond(Inspector root, ResponseHandler handler) {
+ return respond(200, root, handler);
+ }
+
+ private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
+ Response response = new Response(status);
+ response.headers().put("Content-Type", "application/json; charset=UTF-8");
+ ContentChannel out = null;
+ try {
+ out = handler.handleResponse(new Response(status));
+ out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ finally {
+ if (out != null) try {
+ out.close(logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ }
+ return ignoredContent;
+ }
+
+ // ------------------------------------------------ Helpers ------------------------------------------------
+
+ private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
+ VisitorOptions.Builder options = VisitorOptions.builder();
+
+ getProperty(request, SELECTION).ifPresent(options::selection);
+ getProperty(request, CONTINUATION).ifPresent(options::continuation);
+ getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
+ getProperty(request, CLUSTER).ifPresent(options::cluster);
+ getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
+ getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
+ .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
+ getProperty(request, CONCURRENCY, numberParser)
+ .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+
+ return options;
+ }
+
+ static class DocumentPath {
+
+ private final Path path;
+ private final Optional<Group> group;
+
+ DocumentPath(Path path) {
+ this.path = requireNonNull(path);
+ this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
+ .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ }
+
+ DocumentId id() {
+ return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
+ ":" + requireNonNull(path.get("documentType")) +
+ ":" + group.map(Group::docIdPart).orElse("") +
+ ":" + requireNonNull(path.get("docid")));
+ }
+
+ String documentType() { return requireNonNull(path.get("documentType")); }
+ String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<Group> group() { return group; }
+
+ }
+
+ private static Optional<String> getProperty(HttpRequest request, String name) {
+ List<String> values = request.parameters().get(name);
+ if (values != null && values.size() != 0)
+ return Optional.ofNullable(values.get(values.size() - 1));
+
+ return Optional.empty();
+ }
+
+ private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
+ return getProperty(request, name).map(parser::parse);
+ }
+
+
+ @FunctionalInterface
+ interface Parser<T> extends Function<String, T> {
+ default T parse(String value) {
+ try {
+ return apply(value);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
+
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ }
+
+
+ /** Readable content channel which forwards data to a reader when closed. */
+ static class ForwardingContentChannel implements ContentChannel {
+
+ private final ReadableContentChannel delegate = new ReadableContentChannel();
+ private final Consumer<InputStream> reader;
+
+ public ForwardingContentChannel(Consumer<InputStream> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ delegate.write(buf, handler);
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ delegate.close(handler);
+ try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
+ reader.accept(in);
+ }
+ }
+
+ }
+
+
+ private static class DocumentOperationParser {
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private final DocumentTypeManager manager;
+
+ DocumentOperationParser(DocumentmanagerConfig config) {
+ this.manager = new DocumentTypeManager(config);
+ }
+
+ DocumentPut parsePut(InputStream inputStream, String docId) {
+ return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
+ }
+
+ DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
+ return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
+ }
+
+ private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
+ return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
+ }
+
+ }
+
+ private class MeasuringResponseHandler implements ResponseHandler {
+
+ private final ResponseHandler delegate;
+ private final DocumentOperationType type;
+ private final Instant start;
+
+ private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
+ this.delegate = delegate;
+ this.type = type;
+ this.start = start;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ switch (response.getStatus() / 100) {
+ case 2: metrics.reportSuccessful(type, start); break;
+ case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
+ case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
+ }
+ return delegate.handleResponse(response);
+ }
+
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
new file mode 100644
index 00000000000..19f4f50648b
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
@@ -0,0 +1,15 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package=com.yahoo.document.restapi
+
+# Delay before a throttled operation is retried.
+resendDelayMillis int default=100
+
+# Time between a document operation is received and a timeout response is sent
+defaultTimeoutSeconds int default=180
+
+# Time after which a visitor session times out
+visitTimeoutSeconds int default=120
+
+# Bound on number of document operations to keep in retry queue — further operations are rejected
+maxThrottled int default=200
+