summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-13 20:18:14 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-10-13 20:18:14 +0200
commit53e68ede523a33a6f7e87f991ee7c392ee3a088e (patch)
tree13b5528b38958f2f4735749dd356543462580290 /vespaclient-container-plugin/src
parent2931da13cbf55cb62ebc384d568914673f387bdf (diff)
Tests, more config and various fixes
Diffstat (limited to 'vespaclient-container-plugin/src')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java311
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java509
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java140
-rw-r--r--vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def16
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java85
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java406
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java494
7 files changed, 429 insertions, 1532 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
deleted file mode 100644
index d56c8c1524e..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ /dev/null
@@ -1,311 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.FixedBucketSpaces;
-import com.yahoo.document.fieldset.AllFields;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.StaticThrottlePolicy;
-import com.yahoo.text.Text;
-
-import java.time.Duration;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Stream;
-
-/**
- * Wraps the document API with an executor that can retry and time out document operations,
- * as well as compute the required visitor parameters for visitor sessions.
- *
- * @author jonmv
- */
-public interface DocumentOperationExecutor {
-
- default void shutdown() { }
-
- void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
-
- void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context);
-
- void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context);
-
- void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
-
- void visit(VisitorOptions options, VisitOperationsContext context);
-
- String routeToCluster(String cluster);
-
- enum ErrorType {
- OVERLOAD,
- NOT_FOUND,
- PRECONDITION_FAILED,
- BAD_REQUEST,
- ERROR,
- TIMEOUT,
- INSUFFICIENT_STORAGE;
- }
-
-
- /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
- class Context<T> {
-
- private final AtomicBoolean handled = new AtomicBoolean();
- private final BiConsumer<ErrorType, String> onError;
- private final Consumer<T> onSuccess;
-
- Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
- this.onError = onError;
- this.onSuccess = onSuccess;
- }
-
- public void error(ErrorType type, String message) {
- if ( ! handled.getAndSet(true))
- onError.accept(type, message);
- }
-
- public void success(T result) {
- if ( ! handled.getAndSet(true))
- onSuccess.accept(result);
- }
-
- public boolean handled() {
- return handled.get();
- }
-
- }
-
-
- /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
- class VisitOperationsContext extends Context<Optional<String>> {
-
- private final Consumer<Document> onDocument;
-
- public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) {
- super(onError, onSuccess);
- this.onDocument = onDocument;
- }
-
- public void document(Document document) {
- if ( ! handled())
- onDocument.accept(document);
- }
-
- }
-
-
- /** Context for a document operation. */
- class OperationContext extends Context<Optional<Document>> {
-
- public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
- super(onError, onSuccess);
- }
-
- }
-
-
- class VisitorOptions {
-
- final Optional<String> cluster;
- final Optional<String> namespace;
- final Optional<String> documentType;
- final Optional<Group> group;
- final Optional<String> selection;
- final Optional<String> fieldSet;
- final Optional<String> continuation;
- final Optional<String> bucketSpace;
- final Optional<Integer> wantedDocumentCount;
- final Optional<Integer> concurrency;
-
- private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
- Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
- Optional<String> continuation, Optional<String> bucketSpace,
- Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
- this.cluster = cluster;
- this.namespace = namespace;
- this.documentType = documentType;
- this.group = group;
- this.selection = selection;
- this.fieldSet = fieldSet;
- this.continuation = continuation;
- this.bucketSpace = bucketSpace;
- this.wantedDocumentCount = wantedDocumentCount;
- this.concurrency = concurrency;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- VisitorOptions that = (VisitorOptions) o;
- return cluster.equals(that.cluster) &&
- namespace.equals(that.namespace) &&
- documentType.equals(that.documentType) &&
- group.equals(that.group) &&
- selection.equals(that.selection) &&
- fieldSet.equals(that.fieldSet) &&
- continuation.equals(that.continuation) &&
- bucketSpace.equals(that.bucketSpace) &&
- wantedDocumentCount.equals(that.wantedDocumentCount) &&
- concurrency.equals(that.concurrency);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency);
- }
-
- @Override
- public String toString() {
- return "VisitorOptions{" +
- "cluster=" + cluster +
- ", namespace=" + namespace +
- ", documentType=" + documentType +
- ", group=" + group +
- ", selection=" + selection +
- ", fieldSet=" + fieldSet +
- ", continuation=" + continuation +
- ", bucketSpace=" + bucketSpace +
- ", wantedDocumentCount=" + wantedDocumentCount +
- ", concurrency=" + concurrency +
- '}';
- }
-
- public static Builder builder() { return new Builder(); }
-
-
- public static class Builder {
-
- private String cluster;
- private String documentType;
- private String namespace;
- private Group group;
- private String selection;
- private String fieldSet;
- private String continuation;
- private String bucketSpace;
- private Integer wantedDocumentCount;
- private Integer concurrency;
-
- public Builder cluster(String cluster) {
- this.cluster = cluster;
- return this;
- }
-
- public Builder documentType(String documentType) {
- this.documentType = documentType;
- return this;
- }
-
- public Builder namespace(String namespace) {
- this.namespace = namespace;
- return this;
- }
-
- public Builder group(Group group) {
- this.group = group;
- return this;
- }
-
- public Builder selection(String selection) {
- this.selection = selection;
- return this;
- }
-
- public Builder fieldSet(String fieldSet) {
- this.fieldSet = fieldSet;
- return this;
- }
-
- public Builder continuation(String continuation) {
- this.continuation = continuation;
- return this;
- }
-
- public Builder bucketSpace(String bucketSpace) {
- this.bucketSpace = bucketSpace;
- return this;
- }
-
- public Builder wantedDocumentCount(Integer wantedDocumentCount) {
- this.wantedDocumentCount = wantedDocumentCount;
- return this;
- }
-
- public Builder concurrency(Integer concurrency) {
- this.concurrency = concurrency;
- return this;
- }
-
- public VisitorOptions build() {
- return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
- Optional.ofNullable(namespace), Optional.ofNullable(group),
- Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
- Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
- Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
- }
-
- }
-
- }
-
-
- class Group {
-
- private final String value;
- private final String docIdPart;
- private final String selection;
-
- private Group(String value, String docIdPart, String selection) {
- Text.validateTextString(value)
- .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
- this.value = value;
- this.docIdPart = docIdPart;
- this.selection = selection;
- }
-
- public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
- public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
-
- public String value() { return value; }
- public String docIdPart() { return docIdPart; }
- public String selection() { return selection; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Group group = (Group) o;
- return value.equals(group.value) &&
- docIdPart.equals(group.docIdPart) &&
- selection.equals(group.selection);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(value, docIdPart, selection);
- }
-
- @Override
- public String toString() {
- return "Group{" +
- "value='" + value + '\'' +
- ", docIdPart='" + docIdPart + '\'' +
- ", selection='" + selection + '\'' +
- '}';
- }
-
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
deleted file mode 100644
index 309bed38c0b..00000000000
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
+++ /dev/null
@@ -1,509 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.document.FixedBucketSpaces;
-import com.yahoo.document.fieldset.AllFields;
-import com.yahoo.document.select.parser.ParseException;
-import com.yahoo.documentapi.AsyncParameters;
-import com.yahoo.documentapi.AsyncSession;
-import com.yahoo.documentapi.DocumentAccess;
-import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
-import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.ResponseHandler;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.VisitorControlHandler;
-import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
-import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import com.yahoo.messagebus.StaticThrottlePolicy;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiPredicate;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
-import java.util.stream.Stream;
-
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.INSUFFICIENT_STORAGE;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toUnmodifiableMap;
-
-/**
- * Encapsulates a document access and supports running asynchronous document
- * operations and visits against this, with retries and optional timeouts.
- *
- * @author jonmv
- */
-public class DocumentOperationExecutorImpl implements DocumentOperationExecutor {
-
- private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName());
-
- private final Duration visitTimeout;
- private final long maxThrottled;
- private final DocumentAccess access;
- private final AsyncSession asyncSession;
- private final Map<String, StorageCluster> clusters;
- private final Clock clock;
- private final DelayQueue throttled;
- private final DelayQueue timeouts;
- private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
- private final ExecutorService visitSessionShutdownExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("visit-session-shutdown-"));
-
- public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
- DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
- this(Duration.ofMillis(executorConfig.resendDelayMillis()),
- Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
- Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
- executorConfig.maxThrottled(),
- access,
- parseClusters(clustersConfig, bucketsConfig),
- clock);
- }
-
- DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
- DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
- this.visitTimeout = requireNonNull(visitTimeout);
- this.maxThrottled = maxThrottled;
- this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters());
- this.clock = requireNonNull(clock);
- this.clusters = Map.copyOf(clusters);
- this.throttled = new DelayQueue(maxThrottled, this::send, Duration.ZERO, clock, "throttle");
- this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> {
- context.error(TIMEOUT, "Timed out after " + defaultTimeout);
- return true;
- }, defaultTimeout, clock, "timeout");
- }
-
- private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (options.cluster.isEmpty() && options.documentType.isEmpty())
- throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
-
- VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection,
- options.documentType,
- options.namespace.map(value -> "id.namespace=='" + value + "'"),
- options.group.map(Group::selection))
- .flatMap(Optional::stream)
- .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
- StringJoiner::add,
- StringJoiner::merge)
- .toString());
-
- options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
- options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
- parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10));
- options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1)));
- parameters.setTimeoutMs(visitTimeout.toMillis());
- parameters.visitInconsistentBuckets(true);
- parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
-
- StorageCluster storageCluster = resolveCluster(options.cluster, clusters);
- parameters.setRoute(storageCluster.route());
- parameters.setBucketSpace(resolveBucket(storageCluster,
- options.documentType,
- List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
- options.bucketSpace));
-
- return parameters;
- }
-
- /** Assumes this stops receiving operations roughly when this is called, then waits up to 20 seconds to drain operations. */
- @Override
- public void shutdown() {
- long shutdownMillis = clock.instant().plusSeconds(20).toEpochMilli();
- visitSessionShutdownExecutor.shutdown();
- visits.values().forEach(VisitorSession::destroy);
- Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(10),
- context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
- Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(15),
- context -> context.error(TIMEOUT, "Timed out due to shutdown"));
- try {
- throttleShutdown.get(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- timeoutShutdown.get(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- visitSessionShutdownExecutor.awaitTermination(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException | ExecutionException | TimeoutException e) {
- throttleShutdown.cancel(true);
- timeoutShutdown.cancel(true);
- log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
- }
- }
-
- @Override
- public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(parameters, context))), context);
- }
-
- @Override
- public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(parameters, context))), context);
- }
-
- @Override
- public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(parameters, context))), context);
- }
-
- @Override
- public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(parameters, context))), context);
- }
-
- @Override
- public void visit(VisitorOptions options, VisitOperationsContext context) {
- try {
- AtomicBoolean done = new AtomicBoolean(false);
- VisitorParameters parameters = asParameters(options, clusters, visitTimeout);
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
- @Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets())
- context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
- case SUCCESS: // intentional fallthrough
- case ABORTED:
- context.success(Optional.ofNullable(getProgress())
- .filter(progress -> ! progress.isFinished())
- .map(ProgressToken::serializeToString));
- break;
- default:
- context.error(ERROR, message != null ? message : "Visiting failed");
- }
- done.set(true); // This may be reached before dispatching thread is done putting us in the map.
- visits.computeIfPresent(this, (__, session) -> {
- visitSessionShutdownExecutor.execute(() -> session.destroy());
- return null;
- });
- }
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- if (done.get())
- visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> {
- visitSessionShutdownExecutor.execute(() -> session.destroy());
- return null;
- });
- }
- catch (IllegalArgumentException | ParseException e) {
- context.error(BAD_REQUEST, Exceptions.toMessageString(e));
- }
- catch (RuntimeException e) {
- context.error(ERROR, Exceptions.toMessageString(e));
- }
- }
-
- @Override
- public String routeToCluster(String cluster) {
- return resolveCluster(Optional.of(cluster), clusters).route();
- }
-
- private ResponseHandler handlerOf(DocumentOperationParameters parameters, OperationContext context) {
- return response -> {
- parameters.responseHandler().ifPresent(originalHandler -> originalHandler.handleResponse(response));
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- };
- }
-
- /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
- private void accept(Supplier<Result> operation, OperationContext context) {
- timeouts.add(operation, context);
- if (throttled.size() > 0 || ! send(operation, context))
- if ( ! throttled.add(operation, context))
- context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
- }
-
- /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */
- private boolean send(Supplier<Result> operation, OperationContext context) {
- Result result = operation.get();
- switch (result.type()) {
- case SUCCESS:
- return true;
- case TRANSIENT_ERROR:
- return false;
- default:
- log.log(WARNING, "Unknown result type '" + result.type() + "'");
- case FATAL_ERROR: // intentional fallthrough
- context.error(ERROR, result.getError().getMessage());
- return true; // Request handled, don't retry.
- }
- }
-
- private static ErrorType toErrorType(Response.Outcome outcome) {
- switch (outcome) {
- case NOT_FOUND:
- return NOT_FOUND;
- case CONDITION_FAILED:
- return PRECONDITION_FAILED;
- case INSUFFICIENT_STORAGE:
- return INSUFFICIENT_STORAGE;
- default:
- log.log(WARNING, "Unexpected response outcome: " + outcome);
- case ERROR: // intentional fallthrough
- return ERROR;
- }
- }
-
-
- /**
- * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
- *
- * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
- * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
- */
- static class DelayQueue {
-
- private final long maxSize;
- private final Clock clock;
- private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
- private final AtomicLong size = new AtomicLong(0);
- private final Thread maintainer;
- private final Duration delay;
- private final long defaultWaitMillis;
-
- public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action,
- Duration delay, Clock clock, String threadName) {
- if (maxSize < 0)
- throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
- if (delay.isNegative())
- throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
-
- this.maxSize = maxSize;
- this.delay = delay;
- this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
- this.clock = requireNonNull(clock);
- this.maintainer = new DaemonThreadFactory("document-operation-executor-" + threadName).newThread(() -> maintain(action));
- this.maintainer.start();
- }
-
- boolean add(Supplier<Result> operation, OperationContext context) {
- if (size.incrementAndGet() > maxSize) {
- size.decrementAndGet();
- return false;
- }
- return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
- }
-
- long size() { return size.get(); }
-
- Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
- ExecutorService shutdownService = Executors.newSingleThreadExecutor();
- Future<?> future = shutdownService.submit(() -> {
- try {
- long doomMillis = clock.millis() + grace.toMillis();
- while (size.get() > 0 && clock.millis() < doomMillis)
- Thread.sleep(100);
- }
- finally {
- maintainer.interrupt();
- for (Delayed delayed; (delayed = queue.poll()) != null; ) {
- size.decrementAndGet();
- onShutdown.accept(delayed.context());
- }
- }
- return null;
- });
- shutdownService.shutdown();
- return future;
- }
-
- /**
- * Repeatedly loops through the queue, evicting already handled entries and processing those
- * which have become ready since last time, then waits until new items are guaranteed to be ready,
- * or until it's time for a new run just to ensure GC of handled entries.
- * The entries are assumed to always be added to the back of the queue, with the same delay.
- * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
- * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
- */
- private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) {
- while ( ! Thread.currentThread().isInterrupted()) {
- try {
- Instant waitUntil = null;
- Iterator<Delayed> operations = queue.iterator();
- boolean rejected = false;
- while (operations.hasNext()) {
- Delayed delayed = operations.next();
- // Already handled: remove and continue.
- if (delayed.context().handled()) {
- operations.remove();
- size.decrementAndGet();
- continue;
- }
- // Ready for action: remove from queue and run unless an operation was already rejected.
- if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) {
- if (action.test(delayed.operation(), delayed.context())) {
- operations.remove();
- size.decrementAndGet();
- continue;
- }
- else { // If an operation is rejected, handle no more this run, and wait a short while before retrying.
- waitUntil = clock.instant().plus(Duration.ofMillis(10));
- rejected = true;
- }
- }
- // Not yet ready for action: keep time to wake up again.
- waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
- }
- long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
- synchronized (this) {
- do {
- notify();
- wait(Math.max(1, waitUntilMillis - clock.millis()));
- }
- while (clock.millis() < waitUntilMillis);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (Exception e) {
- log.log(SEVERE, "Exception caught by delay queue maintainer", e);
- }
- }
- }
- }
-
-
- private static class Delayed {
-
- private final Supplier<Result> operation;
- private final OperationContext context;
- private final Instant readyAt;
-
- Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
- this.readyAt = requireNonNull(readyAt);
- this.context = requireNonNull(context);
- this.operation = requireNonNull(operation);
- }
-
- Supplier<Result> operation() { return operation; }
- OperationContext context() { return context; }
- Instant readyAt() { return readyAt; }
-
- }
-
-
- static class StorageCluster {
-
- private final String name;
- private final String configId;
- private final Map<String, String> documentBuckets;
-
- StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
- this.name = requireNonNull(name);
- this.configId = requireNonNull(configId);
- this.documentBuckets = Map.copyOf(documentBuckets);
- }
-
- String name() { return name; }
- String configId() { return configId; }
- String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
- Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
-
- }
-
-
- static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
- if (clusters.isEmpty())
- throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
-
- return wanted.map(cluster -> {
- if ( ! clusters.containsKey(cluster))
- throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.get(cluster);
- }).orElseGet(() -> {
- if (clusters.size() > 1)
- throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.values().iterator().next();
- });
- }
-
- private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
- List<String> bucketSpaces, Optional<String> bucketSpace) {
- return documentType.map(type -> cluster.bucketOf(type)
- .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
- "' is not mapped to a known bucket space")))
- .or(() -> bucketSpace.map(space -> {
- if ( ! bucketSpaces.contains(space))
- throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
- String.join(", ", bucketSpaces));
- return space;
- }))
- .orElse(FixedBucketSpaces.defaultSpace());
- }
-
-
-
- private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
- return clusters.storage().stream()
- .collect(toUnmodifiableMap(storage -> storage.name(),
- storage -> new StorageCluster(storage.name(),
- storage.configid(),
- buckets.cluster(storage.name())
- .documentType().entrySet().stream()
- .collect(toMap(entry -> entry.getKey(),
- entry -> entry.getValue().bucketSpace())))));
- }
-
-
- // Visible for testing.
- AsyncSession asyncSession() { return asyncSession; }
- Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
- void notifyMaintainers() throws InterruptedException {
- synchronized (throttled) { throttled.notify(); throttled.wait(); }
- synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
- }
-
-}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 03546e02d61..5918fe9b1b0 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -162,13 +162,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
ClusterListConfig clusterListConfig,
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
- this(Clock.systemUTC(),
- new DocumentOperationParser(documentManagerConfig),
- metric,
- metricReceiver,
- executorConfig.maxThrottled(),
- documentAccess,
- parseClusters(clusterListConfig, bucketSpacesConfig));
+ this(Clock.systemUTC(), metric, metricReceiver, documentAccess,
+ documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig);
+ }
+
+ DocumentV1ApiHandler(Clock clock, Metric metric, MetricReceiver metricReceiver, DocumentAccess access,
+ DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig,
+ ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) {
+ this.clock = clock;
+ this.parser = new DocumentOperationParser(documentmanagerConfig);
+ this.metric = metric;
+ this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.maxThrottled = executorConfig.maxThrottled();
+ this.access = access;
+ this.asyncSession = access.createAsyncSession(new AsyncParameters());
+ this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
+ this.operations = new ConcurrentLinkedDeque<>();
+ this.executor.scheduleWithFixedDelay(this::dispatchEnqueued,
+ executorConfig.resendDelayMillis(),
+ executorConfig.resendDelayMillis(),
+ TimeUnit.MILLISECONDS);
}
DocumentV1ApiHandler(Clock clock, DocumentOperationParser parser, Metric metric, MetricReceiver metricReceiver,
@@ -243,21 +256,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ }
+
/** Defines all paths/methods handled by this handler. */
private Map<String, Map<Method, Handler>> defineApi() {
Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
handlers.put("/document/v1/",
- Map.of(GET, this::getRoot));
+ Map.of(GET, this::getDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/",
- Map.of(GET, this::getDocumentType));
+ Map.of(GET, this::getDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
- Map.of(GET, this::getDocumentType));
+ Map.of(GET, this::getDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
- Map.of(GET, this::getDocumentType));
+ Map.of(GET, this::getDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
Map.of(GET, this::getDocument,
@@ -280,26 +298,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return Collections.unmodifiableMap(handlers);
}
- private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- enqueueAndDispatch(request, handler, () -> {
- VisitorOptions options = parseOptions(request, path).build();
- return () -> {
- visit(request, options, handler);
- return true; // VisitorSession has its own throttle handling.
- };
- });
- return ignoredContent;
- }
-
- private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
- VisitorOptions.Builder optionsBuilder = parseOptions(request, path);
- optionsBuilder = optionsBuilder.documentType(path.documentType());
- optionsBuilder = optionsBuilder.namespace(path.namespace());
- optionsBuilder = path.group().map(optionsBuilder::group).orElse(optionsBuilder);
- VisitorOptions options = optionsBuilder.build();
+ VisitorParameters parameters = parseParameters(request, path);
return () -> {
- visit(request, options, handler);
+ visit(request, parameters, handler);
return true; // VisitorSession has its own throttle handling.
};
});
@@ -372,7 +375,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Dispatches enqueued requests until one is blocked. */
- private void dispatchEnqueued() {
+ void dispatchEnqueued() {
try {
while (dispatchFirst());
}
@@ -406,7 +409,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return;
}
Operation operation = Operation.lazilyParsed(request, handler, operationParser);
- if (enqueued.get() == 0 && operation.dispatch()) // Bypass queue if it is empty.
+ if (enqueued.get() == 1 && operation.dispatch()) // Bypass queue if it is empty.
enqueued.decrementAndGet();
else {
operations.offer(operation);
@@ -414,10 +417,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
}
- @FunctionalInterface
- interface Handler {
- ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
- }
// ------------------------------------------------ Responses ------------------------------------------------
@@ -612,7 +611,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AtomicReference<Operation> operation = new AtomicReference<>();
return () -> {
try {
- operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch();
+ return operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch();
}
catch (IllegalArgumentException e) {
badRequest(request, e, handler);
@@ -742,62 +741,51 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ------------------------------------------------- Visits ------------------------------------------------
- private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
- VisitorOptions.Builder options = VisitorOptions.builder();
+ private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) {
+ int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, numberParser).orElse(1 << 10));
+ if (wantedDocumentCount <= 0)
+ throw new IllegalArgumentException("wantedDocumentCount must be positive");
- getProperty(request, SELECTION).ifPresent(options::selection);
- getProperty(request, CONTINUATION).ifPresent(options::continuation);
- getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
- getProperty(request, CLUSTER).ifPresent(options::cluster);
- getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
- getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
- .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
- getProperty(request, CONCURRENCY, numberParser)
- .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+ int concurrency = Math.min(100, getProperty(request, CONCURRENCY, numberParser).orElse(1));
+ if (concurrency <= 0)
+ throw new IllegalArgumentException("concurrency must be positive");
- return options;
- }
-
- private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (options.cluster.isEmpty() && options.documentType.isEmpty())
+ Optional<String> cluster = getProperty(request, CLUSTER);
+ if (cluster.isEmpty() && path.documentType().isEmpty())
throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
- VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection,
- options.documentType,
- options.namespace.map(value -> "id.namespace=='" + value + "'"),
- options.group.map(Group::selection))
+ VisitorParameters parameters = new VisitorParameters(Stream.of(getProperty(request, SELECTION),
+ path.documentType(),
+ path.namespace().map(value -> "id.namespace=='" + value + "'"),
+ path.group().map(Group::selection))
.flatMap(Optional::stream)
.reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
StringJoiner::add,
StringJoiner::merge)
.toString());
- options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
- options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
- parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10));
- options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1)));
+ getProperty(request, CONTINUATION).map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ parameters.setMaxTotalHits(wantedDocumentCount);
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency));
parameters.setTimeoutMs(visitTimeout.toMillis());
parameters.visitInconsistentBuckets(true);
parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
- StorageCluster storageCluster = resolveCluster(options.cluster, clusters);
+ StorageCluster storageCluster = resolveCluster(cluster, clusters);
parameters.setRoute(storageCluster.route());
parameters.setBucketSpace(resolveBucket(storageCluster,
- options.documentType,
+ path.documentType(),
List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
- options.bucketSpace));
-
+ getProperty(request, BUCKET_SPACE)));
return parameters;
}
- private void visit(HttpRequest request, VisitorOptions options, ResponseHandler handler) {
+ private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
try {
JsonResponse response = JsonResponse.create(request, handler);
response.writeDocumentsArrayStart();
CountDownLatch latch = new CountDownLatch(1);
- VisitorParameters parameters = asParameters(options, clusters, visitTimeout);
parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
@Override public void onDocument(Document doc, long timeStamp) {
loggingException(() -> {
@@ -815,7 +803,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
case TIMEOUT:
if ( ! hasVisitedAnyBuckets()) {
response.writeMessage("No buckets visited within timeout of " + visitTimeout);
- response.commit(Response.Status.GATEWAY_TIMEOUT);
+ response.respond(Response.Status.GATEWAY_TIMEOUT);
break;
}
case SUCCESS: // Intentional fallthrough.
@@ -823,11 +811,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (getProgress() != null && ! getProgress().isFinished())
response.writeContinuation(getProgress().serializeToString());
- response.commit(Response.Status.OK);
+ response.respond(Response.Status.OK);
break;
default:
response.writeMessage(message != null ? message : "Visiting failed");
- response.commit(Response.Status.INTERNAL_SERVER_ERROR);
+ response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
executor.execute(() -> {
try {
@@ -1103,8 +1091,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
- List<String> bucketSpaces, Optional<String> bucketSpace) {
+ static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
return documentType.map(type -> cluster.bucketOf(type)
.orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
"' is not mapped to a known bucket space")))
@@ -1136,8 +1124,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
String rawPath() { return path.asString(); }
- String documentType() { return requireNonNull(path.get("documentType")); }
- String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<String> documentType() { return Optional.ofNullable(path.get("documentType")); }
+ Optional<String> namespace() { return Optional.ofNullable(path.get("namespace")); }
Optional<Group> group() { return group; }
}
@@ -1157,7 +1145,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
- public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\\\'") + "'"); }
public String value() { return value; }
public String docIdPart() { return docIdPart; }
diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
index 770189f90f5..686b33d8cd5 100644
--- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
+++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def
@@ -1,15 +1,15 @@
# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package=com.yahoo.document.restapi
-# Delay before a throttled operation is retried.
-resendDelayMillis int default=100
-
-# Time between a document operation is received and a timeout response is sent
-defaultTimeoutSeconds int default=180
-
-# Time after which a visitor session times out
-visitTimeoutSeconds int default=120
+# Duration for which resender thread sleeps after an operation is throttled
+resendDelayMillis int default=10
# Bound on number of document operations to keep in retry queue — further operations are rejected
maxThrottled int default=1000
+# Whether to perform dispatch from Jetty threads
+doDispatchWithEnqueue bool default=true
+
+# Number of workers in the resender thread pool
+resenderCount int default=1
+
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
deleted file mode 100644
index 3d350adab87..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.document.DocumentGet;
-import com.yahoo.document.DocumentId;
-import com.yahoo.document.DocumentOperation;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentRemove;
-import com.yahoo.document.DocumentUpdate;
-import com.yahoo.documentapi.DocumentOperationParameters;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * @author jonmv
- */
-public class DocumentOperationExecutorMock implements DocumentOperationExecutor {
-
- final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>();
- final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>();
- final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>();
- final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>();
- final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>();
-
- @Override
- public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- setLastOperation(new DocumentGet(id), parameters, context);
- }
-
- @Override
- public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- setLastOperation(put, parameters, context);
- }
-
- @Override
- public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- setLastOperation(update, parameters, context);
- }
-
- @Override
- public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- setLastOperation(new DocumentRemove(id), parameters, context);
- }
-
- @Override
- public void visit(VisitorOptions options, VisitOperationsContext context) {
- lastOptions.set(options);
- lastVisitContext.set(context);
- }
-
- @Override
- public String routeToCluster(String cluster) {
- if ("throw-me".equals(cluster))
- throw new IllegalArgumentException(cluster);
-
- return "route-to-" + cluster;
- }
-
- public DocumentOperation lastOperation() {
- return lastOperation.get();
- }
-
- public DocumentOperationParameters lastParameters() {
- return lastParameters.get();
- }
-
- public OperationContext lastOperationContext() {
- return lastOperationContext.get();
- }
-
- public VisitorOptions lastOptions() {
- return lastOptions.get();
- }
-
- public VisitOperationsContext lastVisitContext() {
- return lastVisitContext.get();
- }
-
- private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) {
- lastOperation.set(operation);
- lastParameters.set(parameters);
- lastOperationContext.set(context);
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
deleted file mode 100644
index 1d2f6af35dd..00000000000
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
+++ /dev/null
@@ -1,406 +0,0 @@
-// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.document.restapi;
-
-import com.yahoo.application.container.DocumentAccesses;
-import com.yahoo.cloud.config.ClusterListConfig;
-import com.yahoo.document.Document;
-import com.yahoo.document.DocumentPut;
-import com.yahoo.document.DocumentType;
-import com.yahoo.document.FixedBucketSpaces;
-import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
-import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
-import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
-import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster;
-import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.VisitorControlHandler;
-import com.yahoo.documentapi.local.LocalAsyncSession;
-import com.yahoo.documentapi.local.LocalDocumentAccess;
-import com.yahoo.test.ManualClock;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
-
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
-import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * This test uses a config definition for the "music" document type, which has a single string field "artist".
- * One cluster named "content" exists, and can be reached through the "route" route for "music" documents.
- *
- * @author jonmv
- */
-public class DocumentOperationExecutorTest {
-
- final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder()
- .cluster("content",
- new AllClustersBucketSpacesConfig.Cluster.Builder()
- .documentType("music",
- new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
- .bucketSpace(FixedBucketSpaces.defaultSpace())))
- .build();
- final ClusterListConfig clusterConfig = new ClusterListConfig.Builder()
- .storage(new ClusterListConfig.Storage.Builder().configid("config-id")
- .name("content"))
- .build();
- final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
- .resendDelayMillis(10)
- .defaultTimeoutSeconds(1)
- .maxThrottled(2)
- .build();
- final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content",
- "config-id",
- Map.of("music", "route")));
- final List<Document> received = new ArrayList<>();
- final List<ErrorType> errors = new ArrayList<>();
- final List<String> messages = new ArrayList<>();
- final List<String> tokens = new ArrayList<>();
- ManualClock clock;
- LocalDocumentAccess access;
- DocumentOperationExecutorImpl executor;
- DocumentType musicType;
- Document doc1;
- Document doc2;
- Document doc3;
-
- OperationContext operationContext() {
- return new OperationContext((type, error) -> { errors.add(type); messages.add(error); },
- document -> document.ifPresent(received::add));
- }
-
- VisitOperationsContext visitContext() {
- return new VisitOperationsContext((type, error) -> { errors.add(type); messages.add(error); },
- token -> token.ifPresent(tokens::add),
- received::add);
- }
-
- LocalAsyncSession session() {
- return (LocalAsyncSession) executor.asyncSession();
- }
-
- @Before
- public void setUp() {
- clock = new ManualClock();
- access = DocumentAccesses.createFromSchemas("src/test/cfg");
- executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock);
- received.clear();
- errors.clear();
- tokens.clear();
-
- musicType = access.getDocumentTypeManager().getDocumentType("music");
- doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one");
- doc2 = new Document(musicType, "id:ns:music:n=1:2"); doc2.setFieldValue("artist", "two");
- doc3 = new Document(musicType, "id:ns:music:g=a:3");
- }
-
- @After
- public void tearDown() {
- access.shutdown();
- }
-
- @Test
- public void testResolveCluster() {
- assertEquals("[Storage:cluster=content;clusterconfigid=config-id]",
- executor.routeToCluster("content"));
- try {
- executor.routeToCluster("blargh");
- fail("Should not find this cluster");
- }
- catch (IllegalArgumentException e) {
- assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage());
- }
- assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name());
- try {
- DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of());
- fail("No clusters should fail");
- }
- catch (IllegalArgumentException e) {
- assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage());
- }
- try {
- Map<String, StorageCluster> twoClusters = new TreeMap<>();
- twoClusters.put("one", new StorageCluster("one", "one-config", Map.of()));
- twoClusters.put("two", new StorageCluster("two", "two-config", Map.of()));
- DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), twoClusters);
- fail("More than one cluster and no document type should fail");
- }
- catch (IllegalArgumentException e) {
- assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage());
- }
- }
-
- @Test
- public void testThrottling() throws InterruptedException {
- executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
- // Put documents 1 and 2 into backend.
- executor.put(new DocumentPut(doc1), parameters(), operationContext());
- executor.put(new DocumentPut(doc2), parameters(), operationContext());
- assertEquals(List.of(doc1, doc2), received);
-
- session().setResultType(Result.ResultType.TRANSIENT_ERROR);
-
- // First two are put on retry queue.
- executor.get(doc1.getId(), parameters(), operationContext());
- executor.get(doc2.getId(), parameters(), operationContext());
- assertEquals(List.of(), errors);
-
- // Third operation is rejected.
- executor.get(doc3.getId(), parameters(), operationContext());
- assertEquals(List.of(OVERLOAD), errors);
-
- // Maintainer does not yet run.
- executor.notifyMaintainers();
- // Third operation is rejected again.
- executor.get(doc3.getId(), parameters(), operationContext());
- assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
-
- // Maintainer retries documents, but they're put back into the queue with a new delay.
- clock.advance(Duration.ofMillis(20));
- executor.notifyMaintainers();
- assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
-
- session().setResultType(Result.ResultType.SUCCESS);
- // Maintainer retries documents again, this time successfully.
- clock.advance(Duration.ofMillis(20));
- executor.notifyMaintainers();
- assertEquals(List.of(OVERLOAD, OVERLOAD), errors);
- assertEquals(List.of(doc1, doc2, doc1, doc2), received);
- }
-
- @Test
- public void testTimeout() throws InterruptedException {
- Phaser phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
-
- // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms
- executor.put(new DocumentPut(doc1), parameters(), operationContext());
- clock.advance(Duration.ofMillis(20));
- executor.put(new DocumentPut(doc2), parameters(), operationContext());
- executor.notifyMaintainers();
- assertEquals(List.of(), errors);
- assertEquals(List.of(), received);
-
- clock.advance(Duration.ofMillis(990));
- executor.notifyMaintainers(); // Let doc1 time out.
- phaser.arriveAndAwaitAdvance(); // Let doc2 arrive.
- phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered.
- assertEquals(List.of(TIMEOUT), errors);
- assertEquals(List.of(doc2), received);
-
- session().setResultType(Result.ResultType.TRANSIENT_ERROR);
- executor.put(new DocumentPut(doc3), parameters(), operationContext());
- clock.advance(Duration.ofMillis(990));
- executor.notifyMaintainers(); // Retry throttled operation.
- clock.advance(Duration.ofMillis(20));
- executor.notifyMaintainers(); // Time out throttled operation.
- assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
- assertEquals(List.of(doc2), received);
-
- session().setResultType(Result.ResultType.SUCCESS);
- clock.advance(Duration.ofMillis(20));
- executor.notifyMaintainers(); // Retry not attempted since operation already timed out.
- phaser.arriveAndAwaitAdvance();
- phaser.arriveAndAwaitAdvance();
- assertEquals(List.of(TIMEOUT, TIMEOUT), errors);
- assertEquals(List.of(doc2), received);
- }
-
- @Test
- public void testCallback() {
- AtomicBoolean called = new AtomicBoolean();
- executor.get(doc1.getId(), parameters().withResponseHandler(__ -> called.set(true)), operationContext());
- assertTrue(called.get());
- assertEquals(List.of(), messages);
- assertEquals(List.of(), errors);
- assertEquals(List.of(), received);
- }
-
- @Test
- public void testVisit() throws InterruptedException {
- executor.put(new DocumentPut(doc1), parameters(), operationContext());
- executor.put(new DocumentPut(doc2), parameters(), operationContext());
- executor.put(new DocumentPut(doc3), parameters(), operationContext());
- assertEquals(doc1, received.remove(0));
- assertEquals(doc2, received.remove(0));
- assertEquals(doc3, received.remove(0));
-
- // No cluster or document type set.
- executor.visit(VisitorOptions.builder()
- .build(),
- visitContext());
- assertEquals("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level", messages.remove(0));
- assertEquals(BAD_REQUEST, errors.remove(0));
- assertEquals(List.of(), received);
-
- // Cluster not found.
- executor.visit(VisitorOptions.builder()
- .cluster("blargh")
- .build(),
- visitContext());
- assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", messages.remove(0));
- assertEquals(BAD_REQUEST, errors.remove(0));
- assertEquals(List.of(), received);
-
- // Matches doc2 for user 1.
- executor.visit(VisitorOptions.builder()
- .cluster("content")
- .group(Group.of(1))
- .build(),
- visitContext());
- for (VisitorControlHandler session : executor.visitorSessions()) {
- session.waitUntilDone();
- }
- assertEquals(List.of(), messages);
- assertEquals(List.of(), errors);
- assertEquals(doc2, received.remove(0));
-
- // Matches documents in namespace ns of type music in group a.
- executor.visit(VisitorOptions.builder()
- .concurrency(2)
- .wantedDocumentCount(3)
- .namespace("ns")
- .documentType("music")
- .fieldSet("music:artist")
- .group(Group.of("a"))
- .build(),
- visitContext());
- for (VisitorControlHandler session : executor.visitorSessions())
- session.waitUntilDone();
- assertEquals(List.of(), messages);
- assertEquals(List.of(), errors);
- assertEquals(doc3, received.remove(0));
-
- // Matches documents with non-empty artist field.
- executor.visit(VisitorOptions.builder()
- .cluster("content")
- .selection("music.artist")
- .fieldSet("[id]")
- .build(),
- visitContext());
- for (VisitorControlHandler session : executor.visitorSessions())
- session.waitUntilDone();
- assertEquals(List.of(), messages);
- assertEquals(List.of(), errors);
- assertEquals(List.of(doc1.getId(), doc2.getId()), List.of(received.remove(0).getId(), received.remove(0).getId()));
-
- // Matches all documents, but we'll shut down midway.
- Phaser phaser = new Phaser(1);
- access.setPhaser(phaser);
- executor.visit(VisitorOptions.builder()
- .cluster("content")
- .bucketSpace("global")
- .build(),
- visitContext());
- phaser.arriveAndAwaitAdvance(); // First document pending
- CountDownLatch latch = new CountDownLatch(1);
- Thread shutdownThread = new Thread(() -> {
- executor.shutdown();
- latch.countDown();
- });
- shutdownThread.start();
- clock.advance(Duration.ofMillis(100));
- executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly.
- latch.await(); // Make sure visit session is shut down before next document is considered.
- phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above.
- for (VisitorControlHandler session : executor.visitorSessions()) {
- session.waitUntilDone();
- }
- assertEquals(List.of(), messages);
- assertEquals(List.of(), errors);
- assertEquals(List.of(doc1), received);
- }
-
- @Test
- public void testDelayQueue() throws ExecutionException, InterruptedException, TimeoutException {
- Supplier<Result> nullOperation = () -> null;
- AtomicLong counter1 = new AtomicLong(0);
- AtomicLong counter2 = new AtomicLong(0);
- AtomicLong counter3 = new AtomicLong(0);
- AtomicBoolean throttle = new AtomicBoolean(true);
- OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet());
- OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet());
- OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet());
- DelayQueue queue = new DelayQueue(3,
- (operation, context) -> {
- if (throttle.get())
- return false;
-
- context.success(Optional.empty());
- return true;
- },
- Duration.ofMillis(30),
- clock,
- "test");
- synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts.
-
- // Add three operations:
- //  the first shall be handled by the queue on second attempt,
- // the second by an external call,and
- // the third during shutdown — added later.
- assertTrue(queue.add(nullOperation, context1));
- clock.advance(Duration.ofMillis(20));
- assertTrue(queue.add(nullOperation, context2));
- assertTrue(queue.add(nullOperation, context3));
- assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3));
- assertEquals(3, queue.size());
- assertEquals(0, counter1.get());
- assertEquals(0, counter2.get());
- assertEquals(0, counter3.get());
-
- context2.error(ERROR, "error"); // Marks this as handled, ready to be evicted.
- synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer does not run yet, as it's not yet time.
- assertEquals(0, counter1.get());
- assertEquals(-1, counter2.get());
- assertEquals(0, counter3.get());
- assertEquals(3, queue.size());
-
- clock.advance(Duration.ofMillis(15));
- synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry.
- assertEquals(0, counter1.get());
- assertEquals(-1, counter2.get());
- assertEquals(0, counter3.get());
- assertEquals(2, queue.size());
-
- throttle.set(false);
- clock.advance(Duration.ofMillis(15));
- synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry.
- assertEquals(1, counter1.get());
- assertEquals(-1, counter2.get());
- assertEquals(0, counter3.get());
- assertEquals(1, queue.size());
-
- queue.shutdown(Duration.ZERO, context -> context.error(ERROR, "shutdown"))
- .get(1, TimeUnit.SECONDS);
- assertEquals(1, counter1.get());
- assertEquals(-1, counter2.get());
- assertEquals(-1, counter3.get());
- assertEquals(0, queue.size());
- }
-
-}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index c6b611ed3c7..6de21e82524 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -1,31 +1,55 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi.resource;
+import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.container.jdisc.RequestHandlerTestDriver;
import com.yahoo.docproc.jdisc.metric.NullMetric;
import com.yahoo.document.Document;
-import com.yahoo.document.DocumentGet;
+import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentRemove;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.document.datatypes.StringFieldValue;
-import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
-import com.yahoo.document.restapi.DocumentOperationExecutorMock;
-import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.DocumentOperationParser;
+import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.StorageCluster;
import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.documentapi.AckToken;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentAccessParams;
-import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.documentapi.DocumentIdResponse;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.SubscriptionParameters;
+import com.yahoo.documentapi.SubscriptionSession;
+import com.yahoo.documentapi.SyncParameters;
+import com.yahoo.documentapi.SyncSession;
+import com.yahoo.documentapi.UpdateResponse;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorDestinationParameters;
+import com.yahoo.documentapi.VisitorDestinationSession;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorResponse;
+import com.yahoo.documentapi.VisitorSession;
import com.yahoo.jdisc.Metric;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.Trace;
import com.yahoo.metrics.simple.MetricReceiver;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.JsonFormat;
import com.yahoo.slime.SlimeUtils;
import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -33,14 +57,16 @@ import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.INSUFFICIENT_STORAGE;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
@@ -50,12 +76,28 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* @author jonmv
*/
public class DocumentV1ApiTest {
+ final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder()
+ .cluster("content",
+ new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music",
+ new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace(FixedBucketSpaces.defaultSpace())))
+ .build();
+ final ClusterListConfig clusterConfig = new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder().configid("config-id")
+ .name("content"))
+ .build();
+ final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
+ .maxThrottled(2)
+ .resendDelayMillis(1 << 30)
+ .build();
final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build();
final DocumentTypeManager manager = new DocumentTypeManager(docConfig);
final Document doc1 = new Document(manager.getDocumentType("music"), "id:space:music::one");
@@ -66,10 +108,11 @@ public class DocumentV1ApiTest {
doc2.setFieldValue("artist", "Asa-Chan & Jun-Ray");
}
+ final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content",
+ "config-id",
+ Map.of("music", "default")));
ManualClock clock;
- DocumentOperationParser parser;
- LocalDocumentAccess access;
- DocumentOperationExecutorMock executor;
+ MockDocumentAccess access;
Metric metric;
MetricReceiver metrics;
DocumentV1ApiHandler handler;
@@ -77,12 +120,10 @@ public class DocumentV1ApiTest {
@Before
public void setUp() {
clock = new ManualClock();
- parser = new DocumentOperationParser(docConfig);
- access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(docConfig));
- executor = new DocumentOperationExecutorMock();
+ access = new MockDocumentAccess(docConfig);
metric = new NullMetric();
metrics = new MetricReceiver.MockReceiver();
- handler = new DocumentV1ApiHandler(clock, executor, parser, metric, metrics);
+ handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig);
}
@After
@@ -91,32 +132,78 @@ public class DocumentV1ApiTest {
}
@Test
+ public void testResolveCluster() {
+ assertEquals("content",
+ DocumentV1ApiHandler.resolveCluster(Optional.empty(), clusters).name());
+ assertEquals("[Storage:cluster=content;clusterconfigid=config-id]",
+ DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters).route());
+ try {
+ DocumentV1ApiHandler.resolveCluster(Optional.empty(), Map.of());
+ fail("Should fail without any clusters");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage());
+ }
+ try {
+ DocumentV1ApiHandler.resolveCluster(Optional.of("blargh"), clusters);
+ fail("Should not find this cluster");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage());
+ }
+ try {
+ Map<String, StorageCluster> twoClusters = new TreeMap<>();
+ twoClusters.put("one", new StorageCluster("one", "one-config", Map.of()));
+ twoClusters.put("two", new StorageCluster("two", "two-config", Map.of()));
+ DocumentV1ApiHandler.resolveCluster(Optional.empty(), twoClusters);
+ fail("More than one cluster and no document type should fail");
+ }
+ catch (IllegalArgumentException e) {
+ assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage());
+ }
+ StorageCluster cluster = DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters);
+ assertEquals(FixedBucketSpaces.defaultSpace(),
+ DocumentV1ApiHandler.resolveBucket(cluster, Optional.of("music"), List.of(), Optional.empty()));
+ assertEquals(FixedBucketSpaces.globalSpace(),
+ DocumentV1ApiHandler.resolveBucket(cluster, Optional.empty(), List.of(FixedBucketSpaces.globalSpace()), Optional.of("global")));
+ }
+
+ @Test
public void testResponses() {
+ Executor visitCompleter = Executors.newSingleThreadExecutor();
RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler);
// GET at non-existent path returns 404 with available paths
var response = driver.sendRequest("http://localhost/document/v1/not-found");
assertSameJson("{" +
" \"pathId\": \"/document/v1/not-found\"," +
" \"message\": \"Nothing at '/document/v1/not-found'. Available paths are:\\n" +
- "/document/v1/\\n" +
- "/document/v1/{namespace}/{documentType}/docid/\\n" +
- "/document/v1/{namespace}/{documentType}/group/{group}/\\n" +
- "/document/v1/{namespace}/{documentType}/number/{number}/\\n" +
- "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" +
- "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" +
- "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" +
- "}",
- response.readAll());
+ "/document/v1/\\n" +
+ "/document/v1/{namespace}/{documentType}/docid/\\n" +
+ "/document/v1/{namespace}/{documentType}/group/{group}/\\n" +
+ "/document/v1/{namespace}/{documentType}/number/{number}/\\n" +
+ "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" +
+ "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" +
+ "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" +
+ "}", response.readAll());
assertEquals("application/json; charset=UTF-8", response.getResponse().headers().getFirst("Content-Type"));
assertEquals(404, response.getStatus());
// GET at root is a visit. Numeric parameters have an upper bound.
- response = driver.sendRequest("http://localhost/document/v1?cluster=lackluster&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
- "&selection=all%20the%20things&fieldSet=[id]&continuation=token");
- executor.lastVisitContext().document(doc1);
- executor.lastVisitContext().document(doc2);
- executor.lastVisitContext().document(doc3);
- executor.lastVisitContext().success(Optional.of("token"));
+ access.expect(parameters -> {
+ assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", parameters.getRoute().toString());
+ assertEquals("default", parameters.getBucketSpace());
+ assertEquals(1024, parameters.getMaxTotalHits());
+ assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount());
+ assertEquals("[id]", parameters.getFieldSet());
+ assertEquals("(all the things)", parameters.getDocumentSelection());
+ // Put some documents in the response
+ ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0);
+ ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0);
+ ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0);
+ visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "message"));
+ });
+ response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" +
+ "&selection=all%20the%20things&fieldSet=[id]");
assertSameJson("{" +
" \"pathId\": \"/document/v1\"," +
" \"documents\": [" +
@@ -136,125 +223,129 @@ public class DocumentV1ApiTest {
" \"id\": \"id:space:music:g=a:three\"," +
" \"fields\": {}" +
" }" +
- " ]," +
- " \"continuation\": \"token\"" +
- "}",
- response.readAll());
+ " ]" +
+ "}", response.readAll());
assertEquals(200, response.getStatus());
- assertEquals(VisitorOptions.builder().cluster("lackluster").bucketSpace("default").wantedDocumentCount(1024)
- .concurrency(100).selection("all the things").fieldSet("[id]").continuation("token").build(),
- executor.lastOptions());
// GET with namespace and document type is a restricted visit.
- response = driver.sendRequest("http://localhost/document/v1/space/music/docid");
- executor.lastVisitContext().error(BAD_REQUEST, "nope");
+ access.expect(parameters -> {
+ assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ throw new IllegalArgumentException("parse failure");
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid\"," +
- " \"documents\": []," +
- " \"message\": \"nope\"" +
- "}",
- response.readAll());
+ " \"message\": \"parse failure\"" +
+ "}", response.readAll());
assertEquals(400, response.getStatus());
- assertEquals(VisitorOptions.builder().namespace("space").documentType("music").build(),
- executor.lastOptions());
// GET with namespace, document type and group is a restricted visit.
- response = driver.sendRequest("http://localhost/document/v1/space/music/group/best");
- executor.lastVisitContext().error(ERROR, "error");
+ access.expect(parameters -> {
+ assertEquals("(music) and (id.namespace=='space') and (id.group=='best\\'')", parameters.getDocumentSelection());
+ visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "error"));
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/group/best%27");
assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/group/best\"," +
+ " \"pathId\": \"/document/v1/space/music/group/best%27\"," +
" \"documents\": []," +
" \"message\": \"error\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(500, response.getStatus());
- assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of("best")).build(),
- executor.lastOptions());
// GET with namespace, document type and number is a restricted visit.
+ access.expect(parameters -> {
+ assertEquals("(music) and (id.namespace=='space') and (id.user==123)", parameters.getDocumentSelection());
+ visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "aborted"));
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/123");
- executor.lastVisitContext().success(Optional.empty());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/123\"," +
" \"documents\": []" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(200, response.getStatus());
- assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of(123)).build(),
- executor.lastOptions());
// GET with full document ID is a document get operation which returns 404 when no document is found
- response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=lackluster&fieldSet=go");
- executor.lastOperationContext().success(Optional.empty());
+ access.session.expect((id, parameters) -> {
+ assertEquals(doc1.getId(), id);
+ assertEquals(parameters().withRoute("[Storage:cluster=content;clusterconfigid=config-id]").withFieldSet("go"), parameters);
+ parameters.responseHandler().get().handleResponse(new DocumentResponse(0, null));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=content&fieldSet=go");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid/one\"," +
" \"id\": \"id:space:music::one\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(404, response.getStatus());
- assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation());
- assertEquals(parameters().withRoute("route-to-lackluster").withFieldSet("go"), executor.lastParameters());
// GET with full document ID is a document get operation.
+ access.session.expect((id, parameters) -> {
+ assertEquals(doc1.getId(), id);
+ assertEquals(parameters(), parameters);
+ parameters.responseHandler().get().handleResponse(new DocumentResponse(0, doc1));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?");
- executor.lastOperationContext().success(Optional.of(doc1));
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid/one\"," +
" \"id\": \"id:space:music::one\"," +
" \"fields\": {" +
" \"artist\": \"Tom Waits\"" +
" }" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(200, response.getStatus());
- assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation());
- assertEquals(parameters(), executor.lastParameters());
// GET with not encoded / in user specified part of document id is a 404
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one/two/three");
response.readAll(); // Must drain body.
assertEquals(404, response.getStatus());
// POST with a document payload is a document put operation.
+ access.session.expect((put, parameters) -> {
+ DocumentPut expectedPut = new DocumentPut(doc2);
+ expectedPut.setCondition(new TestAndSetCondition("test it"));
+ assertEquals(expectedPut, put);
+ assertEquals(parameters(), parameters);
+ parameters.responseHandler().get().handleResponse(new DocumentResponse(0, doc2));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST,
"{" +
" \"fields\": {" +
" \"artist\": \"Asa-Chan & Jun-Ray\"" +
" }" +
"}");
- executor.lastOperationContext().success(Optional.empty());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"id\": \"id:space:music:n=1:two\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(200, response.getStatus());
- DocumentPut put = new DocumentPut(doc2);
- put.setCondition(new TestAndSetCondition("test it"));
- assertEquals(put, executor.lastOperation());
- assertEquals(parameters(), executor.lastParameters());
// PUT with a document update payload is a document update operation.
+ access.session.expect((update, parameters) -> {
+ DocumentUpdate expectedUpdate = new DocumentUpdate(doc3.getDataType(), doc3.getId());
+ expectedUpdate.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl")));
+ expectedUpdate.setCreateIfNonExistent(true);
+ assertEquals(expectedUpdate, update);
+ assertEquals(parameters(), parameters);
+ parameters.responseHandler().get().handleResponse(new UpdateResponse(0, true));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/group/a/three?create=true", PUT,
"{" +
" \"fields\": {" +
" \"artist\": { \"assign\": \"Lisa Ekdahl\" }" +
" }" +
"}");
- executor.lastOperationContext().success(Optional.empty());
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/group/a/three\"," +
" \"id\": \"id:space:music:g=a:three\"" +
- "}",
- response.readAll());
- DocumentUpdate update = new DocumentUpdate(doc3.getDataType(), doc3.getId());
- update.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl")));
- update.setCreateIfNonExistent(true);
- assertEquals(update, executor.lastOperation());
- assertEquals(parameters(), executor.lastParameters());
+ "}", response.readAll());
assertEquals(200, response.getStatus());
// POST with illegal payload is a 400
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST,
"{" +
" ┻━┻︵ \\(°□°)/ ︵ ┻━┻" +
@@ -265,6 +356,7 @@ public class DocumentV1ApiTest {
assertEquals(400, response.getStatus());
// PUT on a unknown document type is a 400
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("http://localhost/document/v1/space/house/group/a/three?create=true", PUT,
"{" +
" \"fields\": {" +
@@ -274,101 +366,229 @@ public class DocumentV1ApiTest {
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/house/group/a/three\"," +
" \"message\": \"Document type house does not exist\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(400, response.getStatus());
// DELETE with full document ID is a document remove operation.
- response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route", DELETE);
- executor.lastOperationContext().success(Optional.empty());
+ access.session.expect((remove, parameters) -> {
+ DocumentRemove expectedRemove = new DocumentRemove(doc2.getId());
+ expectedRemove.setCondition(new TestAndSetCondition("false"));
+ assertEquals(new DocumentRemove(doc2.getId()), remove);
+ assertEquals(parameters.withRoute("route"), parameters);
+ parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId()));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
+ response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route&condition=false", DELETE);
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"id\": \"id:space:music:n=1:two\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(200, response.getStatus());
- assertEquals(new DocumentRemove(doc2.getId()), executor.lastOperation());
- assertEquals(parameters().withRoute("route"), executor.lastParameters());
// GET with non-existent cluster is a 400
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?cluster=throw-me");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"message\": \"throw-me\"" +
- "}",
- response.readAll());
+ " \"message\": \"Your Vespa deployment has no content cluster 'throw-me', only 'content'\"" +
+ "}", response.readAll());
assertEquals(400, response.getStatus());
- // TIMEOUT is a 504
+ // INSUFFICIENT_STORAGE is a 507
+ access.session.expect((id, parameters) -> {
+ parameters.responseHandler().get().handleResponse(new Response(0, "disk full", Response.Outcome.INSUFFICIENT_STORAGE));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
- executor.lastOperationContext().error(TIMEOUT, "timeout");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"id\": \"id:space:music:n=1:two\"," +
- " \"message\": \"timeout\"" +
- "}",
- response.readAll());
- assertEquals(504, response.getStatus());
-
- // INSUFFICIENT_STORAGE is a 504
- response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
- executor.lastOperationContext().error(INSUFFICIENT_STORAGE, "disk full");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"id\": \"id:space:music:n=1:two\"," +
" \"message\": \"disk full\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(507, response.getStatus());
- // OVERLOAD is a 429
- response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
- executor.lastOperationContext().error(OVERLOAD, "overload");
- assertSameJson("{" +
- " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
- " \"id\": \"id:space:music:n=1:two\"," +
- " \"message\": \"overload\"" +
- "}",
- response.readAll());
- assertEquals(429, response.getStatus());
-
// PRECONDITION_FAILED is a 412
+ access.session.expect((id, parameters) -> {
+ parameters.responseHandler().get().handleResponse(new Response(0, "no dice", Response.Outcome.CONDITION_FAILED));
+ return new Result(Result.ResultType.SUCCESS, null);
+ });
response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
- executor.lastOperationContext().error(PRECONDITION_FAILED, "no dice");
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/number/1/two\"," +
" \"id\": \"id:space:music:n=1:two\"," +
" \"message\": \"no dice\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(412, response.getStatus());
- // Client close during processing gives empty body
- response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
- response.clientClose();
- executor.lastOperationContext().error(TIMEOUT, "no dice");
- assertEquals("", response.readAll());
- assertEquals(504, response.getStatus());
-
// OPTIONS gets options
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", OPTIONS);
assertEquals("", response.readAll());
assertEquals(204, response.getStatus());
assertEquals("GET,POST,PUT,DELETE", response.getResponse().headers().getFirst("Allow"));
// PATCH is not allowed
+ access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); });
response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", PATCH);
assertSameJson("{" +
" \"pathId\": \"/document/v1/space/music/docid/one\"," +
" \"message\": \"'PATCH' not allowed at '/document/v1/space/music/docid/one'. Allowed methods are: GET, POST, PUT, DELETE\"" +
- "}",
- response.readAll());
+ "}", response.readAll());
assertEquals(405, response.getStatus());
+ // OVERLOAD is a 429
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, new Error("overload")));
+ var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two");
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" +
+ "}", response3.readAll());
+ assertEquals(429, response3.getStatus());
+ access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, new Error("error")));
+ handler.dispatchEnqueued();
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"error\"" +
+ "}", response1.readAll());
+ assertEquals(500, response1.getStatus());
+ assertSameJson("{" +
+ " \"pathId\": \"/document/v1/space/music/number/1/two\"," +
+ " \"message\": \"error\"" +
+ "}", response2.readAll());
+ assertEquals(500, response2.getStatus());
+
driver.close();
}
- void assertSameJson(String expected, String actual) {
+
+ static class MockDocumentAccess extends DocumentAccess {
+
+ private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>();
+ private final MockAsyncSession session = new MockAsyncSession();
+
+ MockDocumentAccess(DocumentmanagerConfig config) {
+ super(new DocumentAccessParams().setDocumentmanagerConfig(config));
+ }
+
+ @Override
+ public SyncSession createSyncSession(SyncParameters parameters) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public AsyncSession createAsyncSession(AsyncParameters parameters) {
+ return session;
+ }
+
+ @Override
+ public VisitorSession createVisitorSession(VisitorParameters parameters) {
+ expectations.get().accept(parameters);
+ return new VisitorSession() {
+ @Override public boolean isDone() { return false; }
+ @Override public ProgressToken getProgress() { return null; }
+ @Override public Trace getTrace() { return null; }
+ @Override public boolean waitUntilDone(long timeoutMs) { return false; }
+ @Override public void ack(AckToken token) { }
+ @Override public void abort() { }
+ @Override public VisitorResponse getNext() { return null; }
+ @Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; }
+ @Override public void destroy() { }
+ };
+ }
+
+ @Override
+ public VisitorDestinationSession createVisitorDestinationSession(VisitorDestinationParameters parameters) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public SubscriptionSession createSubscription(SubscriptionParameters parameters) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public SubscriptionSession openSubscription(SubscriptionParameters parameters) {
+ throw new AssertionError("Not used");
+ }
+
+ public void expect(Consumer<VisitorParameters> expectations) {
+ this.expectations.set(expectations);
+ }
+
+ }
+
+
+ static class MockAsyncSession implements AsyncSession {
+
+ private final AtomicReference<BiFunction<Object, DocumentOperationParameters, Result>> expectations = new AtomicReference<>();
+
+ @Override
+ public Result put(Document document) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) {
+ return expectations.get().apply(documentPut, parameters);
+ }
+
+ @Override
+ public Result get(DocumentId id) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public Result get(DocumentId id, DocumentOperationParameters parameters) {
+ return expectations.get().apply(id, parameters);
+ }
+
+ @Override
+ public Result remove(DocumentId id) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public Result remove(DocumentRemove remove, DocumentOperationParameters parameters) {
+ return expectations.get().apply(remove, parameters);
+ }
+
+ @Override
+ public Result update(DocumentUpdate update) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public Result update(DocumentUpdate update, DocumentOperationParameters parameters) {
+ return expectations.get().apply(update, parameters);
+ }
+
+ @Override
+ public double getCurrentWindowSize() {
+ throw new AssertionError("Not used");
+ }
+
+ public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) {
+ this.expectations.set(expectations);
+ }
+
+ @Override
+ public Response getNext() {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public Response getNext(int timeoutMilliseconds) {
+ throw new AssertionError("Not used");
+ }
+
+ @Override
+ public void destroy() { }
+
+ }
+
+ static void assertSameJson(String expected, String actual) {
ByteArrayOutputStream expectedPretty = new ByteArrayOutputStream();
ByteArrayOutputStream actualPretty = new ByteArrayOutputStream();
JsonFormat formatter = new JsonFormat(false);