summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-09-29 08:10:05 +0200
committerGitHub <noreply@github.com>2020-09-29 08:10:05 +0200
commitc6aded1606112a54969f56403085ca90d61dac8f (patch)
treedb29615090e57241998ec0deb1c55a49632c3623 /vespaclient-container-plugin
parent09bf1d5f22a7ae98191c94e9be591994b5125557 (diff)
Revert "Jonmv/async doc v1 implementation"
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/pom.xml6
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java704
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java603
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def15
4 files changed, 0 insertions, 1328 deletions
diff --git a/vespaclient-container-plugin/pom.xml b/vespaclient-container-plugin/pom.xml
index 8254c208588..9c4b81da806 100644
--- a/vespaclient-container-plugin/pom.xml
+++ b/vespaclient-container-plugin/pom.xml
@@ -72,12 +72,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>com.yahoo.vespa</groupId>
- <artifactId>testutil</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
<plugins>
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
deleted file mode 100644
index b675af3b564..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ /dev/null
@@ -1,704 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.FixedBucketSpaces;
-import com.yahoo.document.fieldset.AllFields;
-import com.yahoo.document.select.parser.ParseException;
-import com.yahoo.documentapi.AsyncParameters;
-import com.yahoo.documentapi.AsyncSession;
-import com.yahoo.documentapi.DocumentAccess;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
-import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.VisitorControlHandler;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.StaticThrottlePolicy;
-import com.yahoo.text.Text;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
-import java.util.stream.Stream;
-
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toUnmodifiableMap;
-
-/**
- * Encapsulates a document access and supports running asynchronous document
- * operations and visits against this, with retries and optional timeouts.
- *
- * @author jonmv
- */
-public class DocumentOperationExecutor {
-
- private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName());
-
- private final Duration visitTimeout;
- private final long maxThrottled;
- private final DocumentAccess access;
- private final AsyncSession asyncSession;
- private final Map<String, StorageCluster> clusters;
- private final Clock clock;
- private final DelayQueue throttled;
- private final DelayQueue timeouts;
- private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
- private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
-
- public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
- DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
- this(Duration.ofMillis(executorConfig.resendDelayMillis()),
- Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
- Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
- executorConfig.maxThrottled(),
- access,
- parseClusters(clustersConfig, bucketsConfig),
- clock);
- }
-
- DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
- DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
- this.visitTimeout = requireNonNull(visitTimeout);
- this.maxThrottled = maxThrottled;
- this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
- this.clock = requireNonNull(clock);
- this.clusters = Map.copyOf(clusters);
- this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
- this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
- }
-
- /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
- public void shutdown() {
- long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
- Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
- context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
- Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
- context -> context.error(TIMEOUT, "Timed out due to shutdown"));
- visits.values().forEach(VisitorSession::destroy);
- try {
- throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- }
- catch (Exception e) {
- log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
- }
- }
-
- public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.get(id, parameters), context);
- }
-
- public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.put(put, parameters), context);
- }
-
- public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.update(update, parameters), context);
- }
-
- public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.remove(id, parameters), context);
- }
-
- public void visit(VisitorOptions options, VisitOperationsContext context) {
- try {
- AtomicBoolean done = new AtomicBoolean(false);
- VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
- @Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets())
- context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
- case SUCCESS: // intentional fallthrough
- case ABORTED:
- context.success(Optional.ofNullable(getProgress())
- .filter(progress -> ! progress.isFinished())
- .map(ProgressToken::serializeToString));
- break;
- default:
- context.error(ERROR, message != null ? message : "Visiting failed");
- }
- done.set(true); // This may be reached before dispatching thread is done putting us in the map.
- visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
- }
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- if (done.get())
- visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
- }
- catch (IllegalArgumentException | ParseException e) {
- context.error(BAD_REQUEST, Exceptions.toMessageString(e));
- }
- catch (RuntimeException e) {
- context.error(ERROR, Exceptions.toMessageString(e));
- }
- }
-
- public String routeToCluster(String cluster) {
- return resolveCluster(Optional.of(cluster), clusters).route();
- }
-
-
- public enum ErrorType {
- OVERLOAD,
- NOT_FOUND,
- PRECONDITION_FAILED,
- BAD_REQUEST,
- TIMEOUT,
- ERROR;
- }
-
-
- /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
- public static class VisitOperationsContext extends Context<Optional<String>> {
-
- private final Consumer<Document> onDocument;
-
- public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
- super(onError, onSuccess);
- this.onDocument = onDocument;
- }
-
- void document(Document document) {
- if ( ! handled())
- onDocument.accept(document);
- }
-
- }
-
-
- /** Context for a document operation. */
- public static class OperationContext extends Context<Optional<Document>> {
-
- public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
- super(onError, onSuccess);
- }
-
- }
-
-
- public static class VisitorOptions {
-
- private final Optional<String> cluster;
- private final Optional<String> namespace;
- private final Optional<String> documentType;
- private final Optional<Group> group;
- private final Optional<String> selection;
- private final Optional<String> fieldSet;
- private final Optional<String> continuation;
- private final Optional<String> bucketSpace;
- private final Optional<Integer> wantedDocumentCount;
- private final Optional<Integer> concurrency;
-
- private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
- Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
- Optional<String> continuation,Optional<String> bucketSpace,
- Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
- this.cluster = cluster;
- this.namespace = namespace;
- this.documentType = documentType;
- this.group = group;
- this.selection = selection;
- this.fieldSet = fieldSet;
- this.continuation = continuation;
- this.bucketSpace = bucketSpace;
- this.wantedDocumentCount = wantedDocumentCount;
- this.concurrency = concurrency;
- }
-
- private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (cluster.isEmpty() && documentType.isEmpty())
- throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
-
- VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
- documentType,
- namespace.map(value -> "id.namespace=='" + value + "'"),
- group.map(Group::selection))
- .flatMap(Optional::stream)
- .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
- StringJoiner::add,
- StringJoiner::merge)
- .toString());
-
- continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
- wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
- parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10));
- concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1)));
- parameters.setTimeoutMs(visitTimeout.toMillis());
- parameters.visitInconsistentBuckets(true);
- parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
-
- StorageCluster storageCluster = resolveCluster(cluster, clusters);
- parameters.setRoute(storageCluster.route());
- parameters.setBucketSpace(resolveBucket(storageCluster,
- documentType,
- List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
- bucketSpace));
-
- return parameters;
- }
-
- public static Builder builder() { return new Builder(); }
-
-
- public static class Builder {
-
- private String cluster;
- private String documentType;
- private String namespace;
- private Group group;
- private String selection;
- private String fieldSet;
- private String continuation;
- private String bucketSpace;
- private Integer wantedDocumentCount;
- private Integer concurrency;
-
- public Builder cluster(String cluster) {
- this.cluster = cluster;
- return this;
- }
-
- public Builder documentType(String documentType) {
- this.documentType = documentType;
- return this;
- }
-
- public Builder namespace(String namespace) {
- this.namespace = namespace;
- return this;
- }
-
- public Builder group(Group group) {
- this.group = group;
- return this;
- }
-
- public Builder selection(String selection) {
- this.selection = selection;
- return this;
- }
-
- public Builder fieldSet(String fieldSet) {
- this.fieldSet = fieldSet;
- return this;
- }
-
- public Builder continuation(String continuation) {
- this.continuation = continuation;
- return this;
- }
-
- public Builder bucketSpace(String bucketSpace) {
- this.bucketSpace = bucketSpace;
- return this;
- }
-
- public Builder wantedDocumentCount(Integer wantedDocumentCount) {
- this.wantedDocumentCount = wantedDocumentCount;
- return this;
- }
-
- public Builder concurrency(Integer concurrency) {
- this.concurrency = concurrency;
- return this;
- }
-
- public VisitorOptions build() {
- return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
- Optional.ofNullable(namespace), Optional.ofNullable(group),
- Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
- Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
- Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
- }
-
- }
-
- }
-
-
- public static class Group {
-
- private final String value;
- private final String docIdPart;
- private final String selection;
-
- private Group(String value, String docIdPart, String selection) {
- Text.validateTextString(value)
- .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
- this.value = value;
- this.docIdPart = docIdPart;
- this.selection = selection;
- }
-
- public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
- public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
-
- public String value() { return value; }
- public String docIdPart() { return docIdPart; }
- public String selection() { return selection; }
-
- }
-
-
- /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
- private void accept(Supplier<Result> operation, OperationContext context) {
- timeouts.add(operation, context);
- send(operation, context);
- }
-
- /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
- private void send(Supplier<Result> operation, OperationContext context) {
- Result result = operation.get();
- switch (result.type()) {
- case SUCCESS:
- outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
- break;
- case TRANSIENT_ERROR:
- if ( ! throttled.add(operation, context))
- context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
- break;
- default:
- log.log(WARNING, "Unknown result type '" + result.type() + "'");
- case FATAL_ERROR: // intentional fallthrough
- context.error(ERROR, result.getError().getMessage());
- }
- }
-
- private void handle(Response response) {
- outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
- }
-
- private static ErrorType toErrorType(Response.Outcome outcome) {
- switch (outcome) {
- case NOT_FOUND:
- return NOT_FOUND;
- case CONDITION_FAILED:
- return PRECONDITION_FAILED;
- default:
- log.log(WARNING, "Unexpected response outcome: " + outcome);
- case ERROR: // intentional fallthrough
- return ERROR;
- }
- }
-
-
- /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
- private static class Context<T> {
-
- private final AtomicBoolean handled = new AtomicBoolean();
- private final BiConsumer<ErrorType, String> onError;
- private final Consumer<T> onSuccess;
-
- Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
- this.onError = onError;
- this.onSuccess = onSuccess;
- }
-
- void error(ErrorType type, String message) {
- if ( ! handled.getAndSet(true))
- onError.accept(type, message);
- }
-
- void success(T result) {
- if ( ! handled.getAndSet(true))
- onSuccess.accept(result);
- }
-
- boolean handled() {
- return handled.get();
- }
-
- }
-
-
- private static class Completion {
-
- private final OperationContext context;
- private final Response response;
-
- private Completion(OperationContext context, Response response) {
- this.context = context;
- this.response = response;
- }
-
- static Completion of(OperationContext context) {
- return new Completion(requireNonNull(context), null);
- }
-
- static Completion of(Response response) {
- return new Completion(null, requireNonNull(response));
- }
-
- Completion merge(Completion other) {
- if (context == null)
- complete(other.context, response);
- else
- complete(context, other.response);
- return null;
- }
-
- private void complete(OperationContext context, Response response) {
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- }
-
- }
-
-
- /**
- * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
- *
- * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
- * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
- */
- static class DelayQueue {
-
- private final long maxSize;
- private final Clock clock;
- private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
- private final AtomicLong size = new AtomicLong(0);
- private final Thread maintainer;
- private final Duration delay;
- private final long defaultWaitMillis;
-
- public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
- if (maxSize < 0)
- throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
- if (delay.isNegative())
- throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
-
- this.maxSize = maxSize;
- this.delay = delay;
- this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
- this.clock = requireNonNull(clock);
- this.maintainer = new Thread(() -> maintain(action));
- this.maintainer.start();
- }
-
- boolean add(Supplier<Result> operation, OperationContext context) {
- if (size.incrementAndGet() > maxSize) {
- size.decrementAndGet();
- return false;
- }
- return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
- }
-
- long size() { return size.get(); }
-
- Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
- ExecutorService shutdownService = Executors.newSingleThreadExecutor();
- Future<?> future = shutdownService.submit(() -> {
- try {
- Thread.sleep(grace.toMillis());
- }
- finally {
- maintainer.interrupt();
- for (Delayed delayed; (delayed = queue.poll()) != null; ) {
- size.decrementAndGet();
- onShutdown.accept(delayed.context());
- }
- }
- return null;
- });
- shutdownService.shutdown();
- return future;
- }
-
- /**
- * Repeatedly loops through the queue, evicting already handled entries and processing those
- * which have become ready since last time, then waits until new items are guaranteed to be ready,
- * or until it's time for a new run just to ensure GC of handled entries.
- * The entries are assumed to always be added to the back of the queue, with the same delay.
- * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
- * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
- */
- private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
- while ( ! Thread.currentThread().isInterrupted()) {
- try {
- Instant waitUntil = null;
- Iterator<Delayed> operations = queue.iterator();
- while (operations.hasNext()) {
- Delayed delayed = operations.next();
- // Already handled: remove and continue.
- if (delayed.context().handled()) {
- operations.remove();
- size.decrementAndGet();
- continue;
- }
- // Ready for action: remove from queue and run.
- if (delayed.readyAt().isBefore(clock.instant())) {
- operations.remove();
- size.decrementAndGet();
- action.accept(delayed.operation(), delayed.context());
- continue;
- }
- // Not yet ready for action: keep time to wake up again.
- waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
- }
- long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
- synchronized (this) {
- do {
- notify();
- wait(Math.max(0, waitUntilMillis - clock.millis()));
- }
- while (clock.millis() < waitUntilMillis);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (Exception e) {
- log.log(SEVERE, "Exception caught by delay queue maintainer", e);
- }
- }
- }
- }
-
-
- private static class Delayed {
-
- private final Supplier<Result> operation;
- private final OperationContext context;
- private final Instant readyAt;
-
- Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
- this.readyAt = requireNonNull(readyAt);
- this.context = requireNonNull(context);
- this.operation = requireNonNull(operation);
- }
-
- Supplier<Result> operation() { return operation; }
- OperationContext context() { return context; }
- Instant readyAt() { return readyAt; }
-
- }
-
-
- static class StorageCluster {
-
- private final String name;
- private final String configId;
- private final Map<String, String> documentBuckets;
-
- StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
- this.name = requireNonNull(name);
- this.configId = requireNonNull(configId);
- this.documentBuckets = Map.copyOf(documentBuckets);
- }
-
- String name() { return name; }
- String configId() { return configId; }
- String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
- Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
-
- }
-
-
- private static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
- if (clusters.isEmpty())
- throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled.");
-
- return wanted.map(cluster -> {
- if ( ! clusters.containsKey(cluster))
- throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.get(cluster);
- }).orElseGet(() -> {
- if (clusters.size() > 1)
- throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.values().iterator().next();
- });
- }
-
- private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
- List<String> bucketSpaces, Optional<String> bucketSpace) {
- return documentType.map(type -> cluster.bucketOf(type)
- .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
- "' is not mapped to a known bucket space")))
- .or(() -> bucketSpace.map(space -> {
- if ( ! bucketSpaces.contains(space))
- throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
- String.join(", ", bucketSpaces));
- return space;
- }))
- .orElse(FixedBucketSpaces.defaultSpace());
- }
-
-
-
- private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
- return clusters.storage().stream()
- .collect(toUnmodifiableMap(storage -> storage.name(),
- storage -> new StorageCluster(storage.name(),
- storage.configid(),
- buckets.cluster(storage.name())
- .documentType().entrySet().stream()
- .collect(toMap(entry -> entry.getKey(),
- entry -> entry.getValue().bucketSpace())))));
- }
-
-
- // Visible for testing.
- AsyncSession asyncSession() { return asyncSession; }
- Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
- void notifyMaintainers() throws InterruptedException {
- synchronized (throttled) { throttled.notify(); throttled.wait(); }
- synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
deleted file mode 100644
index 96ea6c08f86..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ /dev/null
@@ -1,603 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi.resource;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.google.inject.Inject;
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.container.core.documentapi.VespaDocumentAccess;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentOperation;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentTypeManager;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.TestAndSetCondition;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.document.json.JsonReader;
-import com.yahoo.document.json.JsonWriter;
-import com.yahoo.document.json.document.DocumentParser;
-import com.yahoo.document.restapi.DocumentOperationExecutor;
-import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
-import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
-import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
-import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.metrics.DocumentApiMetrics;
-import com.yahoo.documentapi.metrics.DocumentOperationStatus;
-import com.yahoo.documentapi.metrics.DocumentOperationType;
-import com.yahoo.jdisc.Metric;
-import com.yahoo.jdisc.Request;
-import com.yahoo.jdisc.Response;
-import com.yahoo.jdisc.handler.AbstractRequestHandler;
-import com.yahoo.jdisc.handler.CompletionHandler;
-import com.yahoo.jdisc.handler.ContentChannel;
-import com.yahoo.jdisc.handler.ReadableContentChannel;
-import com.yahoo.jdisc.handler.ResponseHandler;
-import com.yahoo.jdisc.handler.UnsafeContentInputStream;
-import com.yahoo.container.core.HandlerMetricContextUtil;
-import com.yahoo.jdisc.http.HttpRequest;
-import com.yahoo.jdisc.http.HttpRequest.Method;
-import com.yahoo.metrics.simple.MetricReceiver;
-import com.yahoo.restapi.Path;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Inspector;
-import com.yahoo.slime.Slime;
-import com.yahoo.slime.SlimeUtils;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.time.Clock;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.logging.Logger;
-
-import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
-import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
-import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
-import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
-import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
-import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.FINE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.joining;
-
-/**
- * Asynchronous HTTP handler for /document/v1/
- *
- * @author jonmv
- */
-public class DocumentV1ApiHandler extends AbstractRequestHandler {
-
- private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
- private static final Parser<Integer> numberParser = Integer::parseInt;
- private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
-
- private static final CompletionHandler logException = new CompletionHandler() {
- @Override public void completed() { }
- @Override public void failed(Throwable t) {
- log.log(WARNING, "Exception writing response data", t);
- }
- };
-
- private static final ContentChannel ignoredContent = new ContentChannel() {
- @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
- @Override public void close(CompletionHandler handler) { handler.completed(); }
- };
-
- private static final String CREATE = "create";
- private static final String CONDITION = "condition";
- private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
- private static final String FIELD_SET = "fieldSet";
- private static final String SELECTION = "selection";
- private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
- private static final String CONTINUATION = "continuation";
- private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
- private static final String CONCURRENCY = "concurrency";
- private static final String BUCKET_SPACE = "bucketSpace";
-
- private final Clock clock;
- private final Metric metric; // TODO jonmv: make response class which logs on completion/error
- private final DocumentApiMetrics metrics;
- private final DocumentOperationExecutor executor;
- private final DocumentOperationParser parser;
- private final Map<String, Map<Method, Handler>> handlers;
-
- @Inject
- public DocumentV1ApiHandler(Clock clock,
- Metric metric,
- MetricReceiver metricReceiver,
- VespaDocumentAccess documentAccess,
- DocumentmanagerConfig documentManagerConfig,
- ClusterListConfig clusterListConfig,
- AllClustersBucketSpacesConfig bucketSpacesConfig,
- DocumentOperationExecutorConfig executorConfig) {
- this(clock,
- new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
- new DocumentOperationParser(documentManagerConfig),
- metric,
- metricReceiver);
- }
-
- DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
- Metric metric, MetricReceiver metricReceiver) {
- this.clock = clock;
- this.executor = executor;
- this.parser = parser;
- this.metric = metric;
- this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
- this.handlers = defineApi();
- }
-
- @Override
- public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
- HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
- ResponseHandler responseHandler = response -> {
- HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
- return rawResponseHandler.handleResponse(response);
- };
-
- HttpRequest request = (HttpRequest) rawRequest;
- try {
- Path requestPath = new Path(request.getUri());
- for (String path : handlers.keySet())
- if (requestPath.matches(path)) {
- Map<Method, Handler> methods = handlers.get(path);
- if (methods.containsKey(request.getMethod()))
- return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
-
- if (request.getMethod() == OPTIONS)
- return options(methods.keySet(), responseHandler);
-
- return methodNotAllowed(request, methods.keySet(), responseHandler);
- }
- return notFound(request, handlers.keySet(), responseHandler);
- }
- catch (IllegalArgumentException e) {
- return badRequest(request, e, responseHandler);
- }
- catch (RuntimeException e) {
- return serverError(request, e, responseHandler);
- }
- }
-
- @Override
- public void destroy() {
- this.executor.shutdown();
- }
-
- private Map<String, Map<Method, Handler>> defineApi() {
- Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
-
- handlers.put("/document/v1/",
- Map.of(GET, this::getRoot));
-
- handlers.put("/document/v1/{namespace}/{documentType}/docid/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
- Map.of(GET, this::getDocumentType));
-
- handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
- Map.of(GET, this::getDocument,
- POST, this::postDocument,
- PUT, this::putDocument,
- DELETE, this::deleteDocument));
-
- return Collections.unmodifiableMap(handlers);
- }
-
- private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- Cursor documents = root.setArray("documents");
- executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
- return ignoredContent;
- }
-
- private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- VisitorOptions.Builder options = parseOptions(request, path);
- options = options.documentType(path.documentType());
- options = options.namespace(path.namespace());
- options = path.group().map(options::group).orElse(options);
- executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
- return ignoredContent;
- }
-
- private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
- Object monitor = new Object();
- return new VisitOperationsContext((type, message) -> {
- synchronized (monitor) {
- handleError(request, type, message, root, handler);
- }
- },
- token -> {
- token.ifPresent(value -> root.setString("continuation", value));
- synchronized (monitor) {
- respond(root, handler);
- }
- },
- // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
- document -> {
- synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
- SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
- documents.addObject());
- }
- });
- }
- private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- DocumentId id = path.id();
- DocumentOperationParameters parameters = parameters();
- parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
- parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
- executor.get(id,
- parameters,
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- document -> {
- Cursor root = responseRoot(request, id);
- document.map(JsonWriter::toByteArray)
- .map(SlimeUtils::jsonToSlime)
- .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields)")));
- respond(document.isPresent() ? 200 : 404,
- root,
- handler);
- }));
- return ignoredContent;
- }
-
- private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
- return new ForwardingContentChannel(in -> {
- try {
- DocumentPut put = parser.parsePut(in, id.toString());
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
- executor.put(put,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
- }
- });
- }
-
- private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
- return new ForwardingContentChannel(in -> {
- try {
- DocumentUpdate update = parser.parseUpdate(in, id.toString());
- getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
- getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
- executor.update(update,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
- }
- });
- }
-
- private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
- executor.remove(id,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- return ignoredContent;
- }
-
- private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
- switch (type) {
- case BAD_REQUEST:
- badRequest(request, message, root, handler);
- break;
- case NOT_FOUND:
- notFound(request, message, root, handler);
- break;
- case PRECONDITION_FAILED:
- preconditionFailed(request, message, root, handler);
- break;
- case OVERLOAD:
- overload(request, message, root, handler);
- break;
- case TIMEOUT:
- timeout(request, message, root, handler);
- break;
- default:
- log.log(WARNING, "Unexpected error type '" + type + "'");
- case ERROR: // intentional fallthrough
- serverError(request, message, root, handler);
- }
- }
-
- // ------------------------------------------------ Responses ------------------------------------------------
-
- private static Cursor responseRoot(HttpRequest request) {
- Cursor root = new Slime().setObject();
- root.setString("pathId", request.getUri().getRawPath());
- return root;
- }
-
- private static Cursor responseRoot(HttpRequest request, DocumentId id) {
- Cursor root = responseRoot(request);
- root.setString("id", id.toString());
- return root;
- }
-
- private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
- Response response = new Response(Response.Status.NO_CONTENT);
- response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
- handler.handleResponse(response).close(logException);
- return ignoredContent;
- }
-
- private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
- return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
- }
-
- private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.BAD_REQUEST, root, handler);
- }
-
- private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
- return notFound(request,
- "Nothing at '" + request.getUri().getRawPath() + "'. " +
- "Available paths are:\n" + String.join("\n", paths),
- responseRoot(request),
- handler);
- }
-
- private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.NOT_FOUND, root, handler);
- }
-
- private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- root.setString("message",
- "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
- "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
- return respond(Response.Status.METHOD_NOT_ALLOWED,
- root,
- handler);
- }
-
- private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.PRECONDITION_FAILED, root, handler);
- }
-
- private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
- }
-
- private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
- Cursor root = responseRoot(request);
- root.setString("message", Exceptions.toMessageString(e));
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
-
- private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
-
- private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
- }
-
- private static ContentChannel respond(Inspector root, ResponseHandler handler) {
- return respond(200, root, handler);
- }
-
- private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
- Response response = new Response(status);
- response.headers().put("Content-Type", "application/json; charset=UTF-8");
- ContentChannel out = null;
- try {
- out = handler.handleResponse(new Response(status));
- out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
- }
- catch (Exception e) {
- log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
- }
- finally {
- if (out != null) try {
- out.close(logException);
- }
- catch (Exception e) {
- log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
- }
- }
- return ignoredContent;
- }
-
- // ------------------------------------------------ Helpers ------------------------------------------------
-
- private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
- VisitorOptions.Builder options = VisitorOptions.builder();
-
- getProperty(request, SELECTION).ifPresent(options::selection);
- getProperty(request, CONTINUATION).ifPresent(options::continuation);
- getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
- getProperty(request, CLUSTER).ifPresent(options::cluster);
- getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
- getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
- .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
- getProperty(request, CONCURRENCY, numberParser)
- .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
-
- return options;
- }
-
- static class DocumentPath {
-
- private final Path path;
- private final Optional<Group> group;
-
- DocumentPath(Path path) {
- this.path = requireNonNull(path);
- this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
- .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
- }
-
- DocumentId id() {
- return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
- ":" + requireNonNull(path.get("documentType")) +
- ":" + group.map(Group::docIdPart).orElse("") +
- ":" + requireNonNull(path.get("docid")));
- }
-
- String documentType() { return requireNonNull(path.get("documentType")); }
- String namespace() { return requireNonNull(path.get("namespace")); }
- Optional<Group> group() { return group; }
-
- }
-
- private static Optional<String> getProperty(HttpRequest request, String name) {
- List<String> values = request.parameters().get(name);
- if (values != null && values.size() != 0)
- return Optional.ofNullable(values.get(values.size() - 1));
-
- return Optional.empty();
- }
-
- private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
- return getProperty(request, name).map(parser::parse);
- }
-
-
- @FunctionalInterface
- interface Parser<T> extends Function<String, T> {
- default T parse(String value) {
- try {
- return apply(value);
- }
- catch (RuntimeException e) {
- throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
- }
- }
- }
-
-
- @FunctionalInterface
- interface Handler {
- ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
- }
-
-
- /** Readable content channel which forwards data to a reader when closed. */
- static class ForwardingContentChannel implements ContentChannel {
-
- private final ReadableContentChannel delegate = new ReadableContentChannel();
- private final Consumer<InputStream> reader;
-
- public ForwardingContentChannel(Consumer<InputStream> reader) {
- this.reader = reader;
- }
-
- @Override
- public void write(ByteBuffer buf, CompletionHandler handler) {
- delegate.write(buf, handler);
- }
-
- @Override
- public void close(CompletionHandler handler) {
- delegate.close(handler);
- try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
- reader.accept(in);
- }
- }
-
- }
-
-
- private static class DocumentOperationParser {
-
- private static final JsonFactory jsonFactory = new JsonFactory();
-
- private final DocumentTypeManager manager;
-
- DocumentOperationParser(DocumentmanagerConfig config) {
- this.manager = new DocumentTypeManager(config);
- }
-
- DocumentPut parsePut(InputStream inputStream, String docId) {
- return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
- }
-
- DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
- return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
- }
-
- private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
- return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
- }
-
- }
-
- private class MeasuringResponseHandler implements ResponseHandler {
-
- private final ResponseHandler delegate;
- private final DocumentOperationType type;
- private final Instant start;
-
- private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
- this.delegate = delegate;
- this.type = type;
- this.start = start;
- }
-
- @Override
- public ContentChannel handleResponse(Response response) {
- switch (response.getStatus() / 100) {
- case 2: metrics.reportSuccessful(type, start); break;
- case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
- case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
- }
- return delegate.handleResponse(response);
- }
-
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
deleted file mode 100644
index 19f4f50648b..00000000000
--- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
+++ /dev/null
@@ -1,15 +0,0 @@
-# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package=com.yahoo.document.restapi
-
-# Delay before a throttled operation is retried.
-resendDelayMillis int default=100
-
-# Time between a document operation is received and a timeout response is sent
-defaultTimeoutSeconds int default=180
-
-# Time after which a visitor session times out
-visitTimeoutSeconds int default=120
-
-# Bound on number of document operations to keep in retry queue — further operations are rejected
-maxThrottled int default=200
-