diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 20:18:14 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 20:18:14 +0200 |
commit | 53e68ede523a33a6f7e87f991ee7c392ee3a088e (patch) | |
tree | 13b5528b38958f2f4735749dd356543462580290 /vespaclient-container-plugin | |
parent | 2931da13cbf55cb62ebc384d568914673f387bdf (diff) |
Tests, more config and various fixes
Diffstat (limited to 'vespaclient-container-plugin')
7 files changed, 429 insertions, 1532 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java deleted file mode 100644 index d56c8c1524e..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java +++ /dev/null @@ -1,311 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.document.restapi; - -import com.yahoo.document.Document; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.document.FixedBucketSpaces; -import com.yahoo.document.fieldset.AllFields; -import com.yahoo.documentapi.DocumentOperationParameters; -import com.yahoo.documentapi.ProgressToken; -import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.messagebus.StaticThrottlePolicy; -import com.yahoo.text.Text; - -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.StringJoiner; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Stream; - -/** - * Wraps the document API with an executor that can retry and time out document operations, - * as well as compute the required visitor parameters for visitor sessions. - * - * @author jonmv - */ -public interface DocumentOperationExecutor { - - default void shutdown() { } - - void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context); - - void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context); - - void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context); - - void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context); - - void visit(VisitorOptions options, VisitOperationsContext context); - - String routeToCluster(String cluster); - - enum ErrorType { - OVERLOAD, - NOT_FOUND, - PRECONDITION_FAILED, - BAD_REQUEST, - ERROR, - TIMEOUT, - INSUFFICIENT_STORAGE; - } - - - /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */ - class Context<T> { - - private final AtomicBoolean handled = new AtomicBoolean(); - private final BiConsumer<ErrorType, String> onError; - private final Consumer<T> onSuccess; - - Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) { - this.onError = onError; - this.onSuccess = onSuccess; - } - - public void error(ErrorType type, String message) { - if ( ! handled.getAndSet(true)) - onError.accept(type, message); - } - - public void success(T result) { - if ( ! handled.getAndSet(true)) - onSuccess.accept(result); - } - - public boolean handled() { - return handled.get(); - } - - } - - - /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */ - class VisitOperationsContext extends Context<Optional<String>> { - - private final Consumer<Document> onDocument; - - public VisitOperationsContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<String>> onSuccess, Consumer<Document> onDocument) { - super(onError, onSuccess); - this.onDocument = onDocument; - } - - public void document(Document document) { - if ( ! handled()) - onDocument.accept(document); - } - - } - - - /** Context for a document operation. */ - class OperationContext extends Context<Optional<Document>> { - - public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) { - super(onError, onSuccess); - } - - } - - - class VisitorOptions { - - final Optional<String> cluster; - final Optional<String> namespace; - final Optional<String> documentType; - final Optional<Group> group; - final Optional<String> selection; - final Optional<String> fieldSet; - final Optional<String> continuation; - final Optional<String> bucketSpace; - final Optional<Integer> wantedDocumentCount; - final Optional<Integer> concurrency; - - private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace, - Optional<Group> group, Optional<String> selection, Optional<String> fieldSet, - Optional<String> continuation, Optional<String> bucketSpace, - Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) { - this.cluster = cluster; - this.namespace = namespace; - this.documentType = documentType; - this.group = group; - this.selection = selection; - this.fieldSet = fieldSet; - this.continuation = continuation; - this.bucketSpace = bucketSpace; - this.wantedDocumentCount = wantedDocumentCount; - this.concurrency = concurrency; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - VisitorOptions that = (VisitorOptions) o; - return cluster.equals(that.cluster) && - namespace.equals(that.namespace) && - documentType.equals(that.documentType) && - group.equals(that.group) && - selection.equals(that.selection) && - fieldSet.equals(that.fieldSet) && - continuation.equals(that.continuation) && - bucketSpace.equals(that.bucketSpace) && - wantedDocumentCount.equals(that.wantedDocumentCount) && - concurrency.equals(that.concurrency); - } - - @Override - public int hashCode() { - return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency); - } - - @Override - public String toString() { - return "VisitorOptions{" + - "cluster=" + cluster + - ", namespace=" + namespace + - ", documentType=" + documentType + - ", group=" + group + - ", selection=" + selection + - ", fieldSet=" + fieldSet + - ", continuation=" + continuation + - ", bucketSpace=" + bucketSpace + - ", wantedDocumentCount=" + wantedDocumentCount + - ", concurrency=" + concurrency + - '}'; - } - - public static Builder builder() { return new Builder(); } - - - public static class Builder { - - private String cluster; - private String documentType; - private String namespace; - private Group group; - private String selection; - private String fieldSet; - private String continuation; - private String bucketSpace; - private Integer wantedDocumentCount; - private Integer concurrency; - - public Builder cluster(String cluster) { - this.cluster = cluster; - return this; - } - - public Builder documentType(String documentType) { - this.documentType = documentType; - return this; - } - - public Builder namespace(String namespace) { - this.namespace = namespace; - return this; - } - - public Builder group(Group group) { - this.group = group; - return this; - } - - public Builder selection(String selection) { - this.selection = selection; - return this; - } - - public Builder fieldSet(String fieldSet) { - this.fieldSet = fieldSet; - return this; - } - - public Builder continuation(String continuation) { - this.continuation = continuation; - return this; - } - - public Builder bucketSpace(String bucketSpace) { - this.bucketSpace = bucketSpace; - return this; - } - - public Builder wantedDocumentCount(Integer wantedDocumentCount) { - this.wantedDocumentCount = wantedDocumentCount; - return this; - } - - public Builder concurrency(Integer concurrency) { - this.concurrency = concurrency; - return this; - } - - public VisitorOptions build() { - return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType), - Optional.ofNullable(namespace), Optional.ofNullable(group), - Optional.ofNullable(selection), Optional.ofNullable(fieldSet), - Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace), - Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency)); - } - - } - - } - - - class Group { - - private final String value; - private final String docIdPart; - private final String selection; - - private Group(String value, String docIdPart, String selection) { - Text.validateTextString(value) - .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); }); - this.value = value; - this.docIdPart = docIdPart; - this.selection = selection; - } - - public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); } - public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); } - - public String value() { return value; } - public String docIdPart() { return docIdPart; } - public String selection() { return selection; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Group group = (Group) o; - return value.equals(group.value) && - docIdPart.equals(group.docIdPart) && - selection.equals(group.selection); - } - - @Override - public int hashCode() { - return Objects.hash(value, docIdPart, selection); - } - - @Override - public String toString() { - return "Group{" + - "value='" + value + '\'' + - ", docIdPart='" + docIdPart + '\'' + - ", selection='" + selection + '\'' + - '}'; - } - - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java deleted file mode 100644 index 309bed38c0b..00000000000 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java +++ /dev/null @@ -1,509 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.document.restapi; - -import com.yahoo.cloud.config.ClusterListConfig; -import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.document.Document; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.document.FixedBucketSpaces; -import com.yahoo.document.fieldset.AllFields; -import com.yahoo.document.select.parser.ParseException; -import com.yahoo.documentapi.AsyncParameters; -import com.yahoo.documentapi.AsyncSession; -import com.yahoo.documentapi.DocumentAccess; -import com.yahoo.documentapi.DocumentOperationParameters; -import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; -import com.yahoo.documentapi.ProgressToken; -import com.yahoo.documentapi.Response; -import com.yahoo.documentapi.ResponseHandler; -import com.yahoo.documentapi.Result; -import com.yahoo.documentapi.VisitorControlHandler; -import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.messagebus.StaticThrottlePolicy; -import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; -import com.yahoo.yolean.Exceptions; - -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.StringJoiner; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiPredicate; -import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.logging.Logger; -import java.util.stream.Stream; - -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.INSUFFICIENT_STORAGE; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; -import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.SEVERE; -import static java.util.logging.Level.WARNING; -import static java.util.stream.Collectors.toMap; -import static java.util.stream.Collectors.toUnmodifiableMap; - -/** - * Encapsulates a document access and supports running asynchronous document - * operations and visits against this, with retries and optional timeouts. - * - * @author jonmv - */ -public class DocumentOperationExecutorImpl implements DocumentOperationExecutor { - - private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName()); - - private final Duration visitTimeout; - private final long maxThrottled; - private final DocumentAccess access; - private final AsyncSession asyncSession; - private final Map<String, StorageCluster> clusters; - private final Clock clock; - private final DelayQueue throttled; - private final DelayQueue timeouts; - private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); - private final ExecutorService visitSessionShutdownExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("visit-session-shutdown-")); - - public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, - DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) { - this(Duration.ofMillis(executorConfig.resendDelayMillis()), - Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()), - Duration.ofSeconds(executorConfig.visitTimeoutSeconds()), - executorConfig.maxThrottled(), - access, - parseClusters(clustersConfig, bucketsConfig), - clock); - } - - DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled, - DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) { - this.visitTimeout = requireNonNull(visitTimeout); - this.maxThrottled = maxThrottled; - this.access = requireNonNull(access); - this.asyncSession = access.createAsyncSession(new AsyncParameters()); - this.clock = requireNonNull(clock); - this.clusters = Map.copyOf(clusters); - this.throttled = new DelayQueue(maxThrottled, this::send, Duration.ZERO, clock, "throttle"); - this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> { - context.error(TIMEOUT, "Timed out after " + defaultTimeout); - return true; - }, defaultTimeout, clock, "timeout"); - } - - private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { - if (options.cluster.isEmpty() && options.documentType.isEmpty()) - throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); - - VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, - options.documentType, - options.namespace.map(value -> "id.namespace=='" + value + "'"), - options.group.map(Group::selection)) - .flatMap(Optional::stream) - .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right - StringJoiner::add, - StringJoiner::merge) - .toString()); - - options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); - parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); - options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); - parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10)); - options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1))); - parameters.setTimeoutMs(visitTimeout.toMillis()); - parameters.visitInconsistentBuckets(true); - parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); - - StorageCluster storageCluster = resolveCluster(options.cluster, clusters); - parameters.setRoute(storageCluster.route()); - parameters.setBucketSpace(resolveBucket(storageCluster, - options.documentType, - List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), - options.bucketSpace)); - - return parameters; - } - - /** Assumes this stops receiving operations roughly when this is called, then waits up to 20 seconds to drain operations. */ - @Override - public void shutdown() { - long shutdownMillis = clock.instant().plusSeconds(20).toEpochMilli(); - visitSessionShutdownExecutor.shutdown(); - visits.values().forEach(VisitorSession::destroy); - Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(10), - context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown")); - Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(15), - context -> context.error(TIMEOUT, "Timed out due to shutdown")); - try { - throttleShutdown.get(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - timeoutShutdown.get(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - visitSessionShutdownExecutor.awaitTermination(Math.max(1, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - } - catch (InterruptedException | ExecutionException | TimeoutException e) { - throttleShutdown.cancel(true); - timeoutShutdown.cancel(true); - log.log(WARNING, "Exception shutting down " + getClass().getName(), e); - } - } - - @Override - public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.get(id, parameters.withResponseHandler(handlerOf(parameters, context))), context); - } - - @Override - public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.put(put, parameters.withResponseHandler(handlerOf(parameters, context))), context); - } - - @Override - public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.update(update, parameters.withResponseHandler(handlerOf(parameters, context))), context); - } - - @Override - public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.remove(id, parameters.withResponseHandler(handlerOf(parameters, context))), context); - } - - @Override - public void visit(VisitorOptions options, VisitOperationsContext context) { - try { - AtomicBoolean done = new AtomicBoolean(false); - VisitorParameters parameters = asParameters(options, clusters, visitTimeout); - parameters.setLocalDataHandler(new DumpVisitorDataHandler() { - @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); } - @Override public void onRemove(DocumentId id) { } // We don't visit removes here. - }); - parameters.setControlHandler(new VisitorControlHandler() { - @Override public void onDone(CompletionCode code, String message) { - super.onDone(code, message); - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets()) - context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout); - case SUCCESS: // intentional fallthrough - case ABORTED: - context.success(Optional.ofNullable(getProgress()) - .filter(progress -> ! progress.isFinished()) - .map(ProgressToken::serializeToString)); - break; - default: - context.error(ERROR, message != null ? message : "Visiting failed"); - } - done.set(true); // This may be reached before dispatching thread is done putting us in the map. - visits.computeIfPresent(this, (__, session) -> { - visitSessionShutdownExecutor.execute(() -> session.destroy()); - return null; - }); - } - }); - visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); - if (done.get()) - visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { - visitSessionShutdownExecutor.execute(() -> session.destroy()); - return null; - }); - } - catch (IllegalArgumentException | ParseException e) { - context.error(BAD_REQUEST, Exceptions.toMessageString(e)); - } - catch (RuntimeException e) { - context.error(ERROR, Exceptions.toMessageString(e)); - } - } - - @Override - public String routeToCluster(String cluster) { - return resolveCluster(Optional.of(cluster), clusters).route(); - } - - private ResponseHandler handlerOf(DocumentOperationParameters parameters, OperationContext context) { - return response -> { - parameters.responseHandler().ifPresent(originalHandler -> originalHandler.handleResponse(response)); - if (response.isSuccess()) - context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) - : Optional.empty()); - else - context.error(toErrorType(response.outcome()), response.getTextMessage()); - }; - } - - /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ - private void accept(Supplier<Result> operation, OperationContext context) { - timeouts.add(operation, context); - if (throttled.size() > 0 || ! send(operation, context)) - if ( ! throttled.add(operation, context)) - context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); - } - - /** Attempts to send the given operation through the async session of this, returning {@code false} if throttled. */ - private boolean send(Supplier<Result> operation, OperationContext context) { - Result result = operation.get(); - switch (result.type()) { - case SUCCESS: - return true; - case TRANSIENT_ERROR: - return false; - default: - log.log(WARNING, "Unknown result type '" + result.type() + "'"); - case FATAL_ERROR: // intentional fallthrough - context.error(ERROR, result.getError().getMessage()); - return true; // Request handled, don't retry. - } - } - - private static ErrorType toErrorType(Response.Outcome outcome) { - switch (outcome) { - case NOT_FOUND: - return NOT_FOUND; - case CONDITION_FAILED: - return PRECONDITION_FAILED; - case INSUFFICIENT_STORAGE: - return INSUFFICIENT_STORAGE; - default: - log.log(WARNING, "Unexpected response outcome: " + outcome); - case ERROR: // intentional fallthrough - return ERROR; - } - } - - - /** - * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. - * - * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility - * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions. - */ - static class DelayQueue { - - private final long maxSize; - private final Clock clock; - private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>(); - private final AtomicLong size = new AtomicLong(0); - private final Thread maintainer; - private final Duration delay; - private final long defaultWaitMillis; - - public DelayQueue(long maxSize, BiPredicate<Supplier<Result>, OperationContext> action, - Duration delay, Clock clock, String threadName) { - if (maxSize < 0) - throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize); - if (delay.isNegative()) - throw new IllegalArgumentException("Delay cannot be negative, but was " + delay); - - this.maxSize = maxSize; - this.delay = delay; - this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts. - this.clock = requireNonNull(clock); - this.maintainer = new DaemonThreadFactory("document-operation-executor-" + threadName).newThread(() -> maintain(action)); - this.maintainer.start(); - } - - boolean add(Supplier<Result> operation, OperationContext context) { - if (size.incrementAndGet() > maxSize) { - size.decrementAndGet(); - return false; - } - return queue.add(new Delayed(clock.instant().plus(delay), operation, context)); - } - - long size() { return size.get(); } - - Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) { - ExecutorService shutdownService = Executors.newSingleThreadExecutor(); - Future<?> future = shutdownService.submit(() -> { - try { - long doomMillis = clock.millis() + grace.toMillis(); - while (size.get() > 0 && clock.millis() < doomMillis) - Thread.sleep(100); - } - finally { - maintainer.interrupt(); - for (Delayed delayed; (delayed = queue.poll()) != null; ) { - size.decrementAndGet(); - onShutdown.accept(delayed.context()); - } - } - return null; - }); - shutdownService.shutdown(); - return future; - } - - /** - * Repeatedly loops through the queue, evicting already handled entries and processing those - * which have become ready since last time, then waits until new items are guaranteed to be ready, - * or until it's time for a new run just to ensure GC of handled entries. - * The entries are assumed to always be added to the back of the queue, with the same delay. - * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time - * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first. - */ - private void maintain(BiPredicate<Supplier<Result>, OperationContext> action) { - while ( ! Thread.currentThread().isInterrupted()) { - try { - Instant waitUntil = null; - Iterator<Delayed> operations = queue.iterator(); - boolean rejected = false; - while (operations.hasNext()) { - Delayed delayed = operations.next(); - // Already handled: remove and continue. - if (delayed.context().handled()) { - operations.remove(); - size.decrementAndGet(); - continue; - } - // Ready for action: remove from queue and run unless an operation was already rejected. - if (delayed.readyAt().isBefore(clock.instant()) && ! rejected) { - if (action.test(delayed.operation(), delayed.context())) { - operations.remove(); - size.decrementAndGet(); - continue; - } - else { // If an operation is rejected, handle no more this run, and wait a short while before retrying. - waitUntil = clock.instant().plus(Duration.ofMillis(10)); - rejected = true; - } - } - // Not yet ready for action: keep time to wake up again. - waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); - } - long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis; - synchronized (this) { - do { - notify(); - wait(Math.max(1, waitUntilMillis - clock.millis())); - } - while (clock.millis() < waitUntilMillis); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - catch (Exception e) { - log.log(SEVERE, "Exception caught by delay queue maintainer", e); - } - } - } - } - - - private static class Delayed { - - private final Supplier<Result> operation; - private final OperationContext context; - private final Instant readyAt; - - Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) { - this.readyAt = requireNonNull(readyAt); - this.context = requireNonNull(context); - this.operation = requireNonNull(operation); - } - - Supplier<Result> operation() { return operation; } - OperationContext context() { return context; } - Instant readyAt() { return readyAt; } - - } - - - static class StorageCluster { - - private final String name; - private final String configId; - private final Map<String, String> documentBuckets; - - StorageCluster(String name, String configId, Map<String, String> documentBuckets) { - this.name = requireNonNull(name); - this.configId = requireNonNull(configId); - this.documentBuckets = Map.copyOf(documentBuckets); - } - - String name() { return name; } - String configId() { return configId; } - String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } - Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } - - } - - - static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { - if (clusters.isEmpty()) - throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); - - return wanted.map(cluster -> { - if ( ! clusters.containsKey(cluster)) - throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + - String.join("', '", clusters.keySet()) + "'"); - - return clusters.get(cluster); - }).orElseGet(() -> { - if (clusters.size() > 1) - throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + - String.join("', '", clusters.keySet()) + "'"); - - return clusters.values().iterator().next(); - }); - } - - private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, - List<String> bucketSpaces, Optional<String> bucketSpace) { - return documentType.map(type -> cluster.bucketOf(type) - .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + - "' is not mapped to a known bucket space"))) - .or(() -> bucketSpace.map(space -> { - if ( ! bucketSpaces.contains(space)) - throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + - String.join(", ", bucketSpaces)); - return space; - })) - .orElse(FixedBucketSpaces.defaultSpace()); - } - - - - private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) { - return clusters.storage().stream() - .collect(toUnmodifiableMap(storage -> storage.name(), - storage -> new StorageCluster(storage.name(), - storage.configid(), - buckets.cluster(storage.name()) - .documentType().entrySet().stream() - .collect(toMap(entry -> entry.getKey(), - entry -> entry.getValue().bucketSpace()))))); - } - - - // Visible for testing. - AsyncSession asyncSession() { return asyncSession; } - Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); } - void notifyMaintainers() throws InterruptedException { - synchronized (throttled) { throttled.notify(); throttled.wait(); } - synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } - } - -} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 03546e02d61..5918fe9b1b0 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -162,13 +162,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { - this(Clock.systemUTC(), - new DocumentOperationParser(documentManagerConfig), - metric, - metricReceiver, - executorConfig.maxThrottled(), - documentAccess, - parseClusters(clusterListConfig, bucketSpacesConfig)); + this(Clock.systemUTC(), metric, metricReceiver, documentAccess, + documentManagerConfig, executorConfig, clusterListConfig, bucketSpacesConfig); + } + + DocumentV1ApiHandler(Clock clock, Metric metric, MetricReceiver metricReceiver, DocumentAccess access, + DocumentmanagerConfig documentmanagerConfig, DocumentOperationExecutorConfig executorConfig, + ClusterListConfig clusterListConfig, AllClustersBucketSpacesConfig bucketSpacesConfig) { + this.clock = clock; + this.parser = new DocumentOperationParser(documentmanagerConfig); + this.metric = metric; + this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); + this.maxThrottled = executorConfig.maxThrottled(); + this.access = access; + this.asyncSession = access.createAsyncSession(new AsyncParameters()); + this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); + this.operations = new ConcurrentLinkedDeque<>(); + this.executor.scheduleWithFixedDelay(this::dispatchEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); } DocumentV1ApiHandler(Clock clock, DocumentOperationParser parser, Metric metric, MetricReceiver metricReceiver, @@ -243,21 +256,26 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } + @FunctionalInterface + interface Handler { + ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler); + } + /** Defines all paths/methods handled by this handler. */ private Map<String, Map<Method, Handler>> defineApi() { Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); handlers.put("/document/v1/", - Map.of(GET, this::getRoot)); + Map.of(GET, this::getDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/", - Map.of(GET, this::getDocumentType)); + Map.of(GET, this::getDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/", - Map.of(GET, this::getDocumentType)); + Map.of(GET, this::getDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/", - Map.of(GET, this::getDocumentType)); + Map.of(GET, this::getDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}", Map.of(GET, this::getDocument, @@ -280,26 +298,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return Collections.unmodifiableMap(handlers); } - private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) { - enqueueAndDispatch(request, handler, () -> { - VisitorOptions options = parseOptions(request, path).build(); - return () -> { - visit(request, options, handler); - return true; // VisitorSession has its own throttle handling. - }; - }); - return ignoredContent; - } - - private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) { + private ContentChannel getDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { - VisitorOptions.Builder optionsBuilder = parseOptions(request, path); - optionsBuilder = optionsBuilder.documentType(path.documentType()); - optionsBuilder = optionsBuilder.namespace(path.namespace()); - optionsBuilder = path.group().map(optionsBuilder::group).orElse(optionsBuilder); - VisitorOptions options = optionsBuilder.build(); + VisitorParameters parameters = parseParameters(request, path); return () -> { - visit(request, options, handler); + visit(request, parameters, handler); return true; // VisitorSession has its own throttle handling. }; }); @@ -372,7 +375,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } /** Dispatches enqueued requests until one is blocked. */ - private void dispatchEnqueued() { + void dispatchEnqueued() { try { while (dispatchFirst()); } @@ -406,7 +409,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return; } Operation operation = Operation.lazilyParsed(request, handler, operationParser); - if (enqueued.get() == 0 && operation.dispatch()) // Bypass queue if it is empty. + if (enqueued.get() == 1 && operation.dispatch()) // Bypass queue if it is empty. enqueued.decrementAndGet(); else { operations.offer(operation); @@ -414,10 +417,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } - @FunctionalInterface - interface Handler { - ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler); - } // ------------------------------------------------ Responses ------------------------------------------------ @@ -612,7 +611,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AtomicReference<Operation> operation = new AtomicReference<>(); return () -> { try { - operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch(); + return operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch(); } catch (IllegalArgumentException e) { badRequest(request, e, handler); @@ -742,62 +741,51 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------- Visits ------------------------------------------------ - private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) { - VisitorOptions.Builder options = VisitorOptions.builder(); + private VisitorParameters parseParameters(HttpRequest request, DocumentPath path) { + int wantedDocumentCount = Math.min(1 << 10, getProperty(request, WANTED_DOCUMENT_COUNT, numberParser).orElse(1 << 10)); + if (wantedDocumentCount <= 0) + throw new IllegalArgumentException("wantedDocumentCount must be positive"); - getProperty(request, SELECTION).ifPresent(options::selection); - getProperty(request, CONTINUATION).ifPresent(options::continuation); - getProperty(request, FIELD_SET).ifPresent(options::fieldSet); - getProperty(request, CLUSTER).ifPresent(options::cluster); - getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace); - getProperty(request, WANTED_DOCUMENT_COUNT, numberParser) - .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count))); - getProperty(request, CONCURRENCY, numberParser) - .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency))); + int concurrency = Math.min(100, getProperty(request, CONCURRENCY, numberParser).orElse(1)); + if (concurrency <= 0) + throw new IllegalArgumentException("concurrency must be positive"); - return options; - } - - private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { - if (options.cluster.isEmpty() && options.documentType.isEmpty()) + Optional<String> cluster = getProperty(request, CLUSTER); + if (cluster.isEmpty() && path.documentType().isEmpty()) throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); - VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, - options.documentType, - options.namespace.map(value -> "id.namespace=='" + value + "'"), - options.group.map(Group::selection)) + VisitorParameters parameters = new VisitorParameters(Stream.of(getProperty(request, SELECTION), + path.documentType(), + path.namespace().map(value -> "id.namespace=='" + value + "'"), + path.group().map(Group::selection)) .flatMap(Optional::stream) .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right StringJoiner::add, StringJoiner::merge) .toString()); - options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); - parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); - options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); - parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10)); - options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1))); + getProperty(request, CONTINUATION).map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); + parameters.setFieldSet(getProperty(request, FIELD_SET).orElse(path.documentType().map(type -> type + ":[document]").orElse(AllFields.NAME))); + parameters.setMaxTotalHits(wantedDocumentCount); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency)); parameters.setTimeoutMs(visitTimeout.toMillis()); parameters.visitInconsistentBuckets(true); parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); - StorageCluster storageCluster = resolveCluster(options.cluster, clusters); + StorageCluster storageCluster = resolveCluster(cluster, clusters); parameters.setRoute(storageCluster.route()); parameters.setBucketSpace(resolveBucket(storageCluster, - options.documentType, + path.documentType(), List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), - options.bucketSpace)); - + getProperty(request, BUCKET_SPACE))); return parameters; } - private void visit(HttpRequest request, VisitorOptions options, ResponseHandler handler) { + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { try { JsonResponse response = JsonResponse.create(request, handler); response.writeDocumentsArrayStart(); CountDownLatch latch = new CountDownLatch(1); - VisitorParameters parameters = asParameters(options, clusters, visitTimeout); parameters.setLocalDataHandler(new DumpVisitorDataHandler() { @Override public void onDocument(Document doc, long timeStamp) { loggingException(() -> { @@ -815,7 +803,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { case TIMEOUT: if ( ! hasVisitedAnyBuckets()) { response.writeMessage("No buckets visited within timeout of " + visitTimeout); - response.commit(Response.Status.GATEWAY_TIMEOUT); + response.respond(Response.Status.GATEWAY_TIMEOUT); break; } case SUCCESS: // Intentional fallthrough. @@ -823,11 +811,11 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (getProgress() != null && ! getProgress().isFinished()) response.writeContinuation(getProgress().serializeToString()); - response.commit(Response.Status.OK); + response.respond(Response.Status.OK); break; default: response.writeMessage(message != null ? message : "Visiting failed"); - response.commit(Response.Status.INTERNAL_SERVER_ERROR); + response.respond(Response.Status.INTERNAL_SERVER_ERROR); } executor.execute(() -> { try { @@ -1103,8 +1091,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, - List<String> bucketSpaces, Optional<String> bucketSpace) { + static String resolveBucket(StorageCluster cluster, Optional<String> documentType, + List<String> bucketSpaces, Optional<String> bucketSpace) { return documentType.map(type -> cluster.bucketOf(type) .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + "' is not mapped to a known bucket space"))) @@ -1136,8 +1124,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } String rawPath() { return path.asString(); } - String documentType() { return requireNonNull(path.get("documentType")); } - String namespace() { return requireNonNull(path.get("namespace")); } + Optional<String> documentType() { return Optional.ofNullable(path.get("documentType")); } + Optional<String> namespace() { return Optional.ofNullable(path.get("namespace")); } Optional<Group> group() { return group; } } @@ -1157,7 +1145,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); } - public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); } + public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\\\'") + "'"); } public String value() { return value; } public String docIdPart() { return docIdPart; } diff --git a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def index 770189f90f5..686b33d8cd5 100644 --- a/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def +++ b/vespaclient-container-plugin/src/main/resources/configdefinitions/document-operation-executor.def @@ -1,15 +1,15 @@ # Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package=com.yahoo.document.restapi -# Delay before a throttled operation is retried. -resendDelayMillis int default=100 - -# Time between a document operation is received and a timeout response is sent -defaultTimeoutSeconds int default=180 - -# Time after which a visitor session times out -visitTimeoutSeconds int default=120 +# Duration for which resender thread sleeps after an operation is throttled +resendDelayMillis int default=10 # Bound on number of document operations to keep in retry queue — further operations are rejected maxThrottled int default=1000 +# Whether to perform dispatch from Jetty threads +doDispatchWithEnqueue bool default=true + +# Number of workers in the resender thread pool +resenderCount int default=1 + diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java deleted file mode 100644 index 3d350adab87..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.document.restapi; - -import com.yahoo.document.DocumentGet; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentOperation; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentRemove; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.documentapi.DocumentOperationParameters; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * @author jonmv - */ -public class DocumentOperationExecutorMock implements DocumentOperationExecutor { - - final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>(); - final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>(); - final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>(); - final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>(); - final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>(); - - @Override - public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - setLastOperation(new DocumentGet(id), parameters, context); - } - - @Override - public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { - setLastOperation(put, parameters, context); - } - - @Override - public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { - setLastOperation(update, parameters, context); - } - - @Override - public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - setLastOperation(new DocumentRemove(id), parameters, context); - } - - @Override - public void visit(VisitorOptions options, VisitOperationsContext context) { - lastOptions.set(options); - lastVisitContext.set(context); - } - - @Override - public String routeToCluster(String cluster) { - if ("throw-me".equals(cluster)) - throw new IllegalArgumentException(cluster); - - return "route-to-" + cluster; - } - - public DocumentOperation lastOperation() { - return lastOperation.get(); - } - - public DocumentOperationParameters lastParameters() { - return lastParameters.get(); - } - - public OperationContext lastOperationContext() { - return lastOperationContext.get(); - } - - public VisitorOptions lastOptions() { - return lastOptions.get(); - } - - public VisitOperationsContext lastVisitContext() { - return lastVisitContext.get(); - } - - private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) { - lastOperation.set(operation); - lastParameters.set(parameters); - lastOperationContext.set(context); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java deleted file mode 100644 index 1d2f6af35dd..00000000000 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java +++ /dev/null @@ -1,406 +0,0 @@ -// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.document.restapi; - -import com.yahoo.application.container.DocumentAccesses; -import com.yahoo.cloud.config.ClusterListConfig; -import com.yahoo.document.Document; -import com.yahoo.document.DocumentPut; -import com.yahoo.document.DocumentType; -import com.yahoo.document.FixedBucketSpaces; -import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType; -import com.yahoo.document.restapi.DocumentOperationExecutor.Group; -import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; -import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; -import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; -import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster; -import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue; -import com.yahoo.documentapi.Result; -import com.yahoo.documentapi.VisitorControlHandler; -import com.yahoo.documentapi.local.LocalAsyncSession; -import com.yahoo.documentapi.local.LocalDocumentAccess; -import com.yahoo.test.ManualClock; -import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; - -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; -import static com.yahoo.documentapi.DocumentOperationParameters.parameters; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * This test uses a config definition for the "music" document type, which has a single string field "artist". - * One cluster named "content" exists, and can be reached through the "route" route for "music" documents. - * - * @author jonmv - */ -public class DocumentOperationExecutorTest { - - final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder() - .cluster("content", - new AllClustersBucketSpacesConfig.Cluster.Builder() - .documentType("music", - new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() - .bucketSpace(FixedBucketSpaces.defaultSpace()))) - .build(); - final ClusterListConfig clusterConfig = new ClusterListConfig.Builder() - .storage(new ClusterListConfig.Storage.Builder().configid("config-id") - .name("content")) - .build(); - final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder() - .resendDelayMillis(10) - .defaultTimeoutSeconds(1) - .maxThrottled(2) - .build(); - final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", - "config-id", - Map.of("music", "route"))); - final List<Document> received = new ArrayList<>(); - final List<ErrorType> errors = new ArrayList<>(); - final List<String> messages = new ArrayList<>(); - final List<String> tokens = new ArrayList<>(); - ManualClock clock; - LocalDocumentAccess access; - DocumentOperationExecutorImpl executor; - DocumentType musicType; - Document doc1; - Document doc2; - Document doc3; - - OperationContext operationContext() { - return new OperationContext((type, error) -> { errors.add(type); messages.add(error); }, - document -> document.ifPresent(received::add)); - } - - VisitOperationsContext visitContext() { - return new VisitOperationsContext((type, error) -> { errors.add(type); messages.add(error); }, - token -> token.ifPresent(tokens::add), - received::add); - } - - LocalAsyncSession session() { - return (LocalAsyncSession) executor.asyncSession(); - } - - @Before - public void setUp() { - clock = new ManualClock(); - access = DocumentAccesses.createFromSchemas("src/test/cfg"); - executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock); - received.clear(); - errors.clear(); - tokens.clear(); - - musicType = access.getDocumentTypeManager().getDocumentType("music"); - doc1 = new Document(musicType, "id:ns:music::1"); doc1.setFieldValue("artist", "one"); - doc2 = new Document(musicType, "id:ns:music:n=1:2"); doc2.setFieldValue("artist", "two"); - doc3 = new Document(musicType, "id:ns:music:g=a:3"); - } - - @After - public void tearDown() { - access.shutdown(); - } - - @Test - public void testResolveCluster() { - assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", - executor.routeToCluster("content")); - try { - executor.routeToCluster("blargh"); - fail("Should not find this cluster"); - } - catch (IllegalArgumentException e) { - assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage()); - } - assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name()); - try { - DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of()); - fail("No clusters should fail"); - } - catch (IllegalArgumentException e) { - assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage()); - } - try { - Map<String, StorageCluster> twoClusters = new TreeMap<>(); - twoClusters.put("one", new StorageCluster("one", "one-config", Map.of())); - twoClusters.put("two", new StorageCluster("two", "two-config", Map.of())); - DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), twoClusters); - fail("More than one cluster and no document type should fail"); - } - catch (IllegalArgumentException e) { - assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage()); - } - } - - @Test - public void testThrottling() throws InterruptedException { - executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts. - // Put documents 1 and 2 into backend. - executor.put(new DocumentPut(doc1), parameters(), operationContext()); - executor.put(new DocumentPut(doc2), parameters(), operationContext()); - assertEquals(List.of(doc1, doc2), received); - - session().setResultType(Result.ResultType.TRANSIENT_ERROR); - - // First two are put on retry queue. - executor.get(doc1.getId(), parameters(), operationContext()); - executor.get(doc2.getId(), parameters(), operationContext()); - assertEquals(List.of(), errors); - - // Third operation is rejected. - executor.get(doc3.getId(), parameters(), operationContext()); - assertEquals(List.of(OVERLOAD), errors); - - // Maintainer does not yet run. - executor.notifyMaintainers(); - // Third operation is rejected again. - executor.get(doc3.getId(), parameters(), operationContext()); - assertEquals(List.of(OVERLOAD, OVERLOAD), errors); - - // Maintainer retries documents, but they're put back into the queue with a new delay. - clock.advance(Duration.ofMillis(20)); - executor.notifyMaintainers(); - assertEquals(List.of(OVERLOAD, OVERLOAD), errors); - - session().setResultType(Result.ResultType.SUCCESS); - // Maintainer retries documents again, this time successfully. - clock.advance(Duration.ofMillis(20)); - executor.notifyMaintainers(); - assertEquals(List.of(OVERLOAD, OVERLOAD), errors); - assertEquals(List.of(doc1, doc2, doc1, doc2), received); - } - - @Test - public void testTimeout() throws InterruptedException { - Phaser phaser = new Phaser(1); - access.setPhaser(phaser); - executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts. - - // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms - executor.put(new DocumentPut(doc1), parameters(), operationContext()); - clock.advance(Duration.ofMillis(20)); - executor.put(new DocumentPut(doc2), parameters(), operationContext()); - executor.notifyMaintainers(); - assertEquals(List.of(), errors); - assertEquals(List.of(), received); - - clock.advance(Duration.ofMillis(990)); - executor.notifyMaintainers(); // Let doc1 time out. - phaser.arriveAndAwaitAdvance(); // Let doc2 arrive. - phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered. - assertEquals(List.of(TIMEOUT), errors); - assertEquals(List.of(doc2), received); - - session().setResultType(Result.ResultType.TRANSIENT_ERROR); - executor.put(new DocumentPut(doc3), parameters(), operationContext()); - clock.advance(Duration.ofMillis(990)); - executor.notifyMaintainers(); // Retry throttled operation. - clock.advance(Duration.ofMillis(20)); - executor.notifyMaintainers(); // Time out throttled operation. - assertEquals(List.of(TIMEOUT, TIMEOUT), errors); - assertEquals(List.of(doc2), received); - - session().setResultType(Result.ResultType.SUCCESS); - clock.advance(Duration.ofMillis(20)); - executor.notifyMaintainers(); // Retry not attempted since operation already timed out. - phaser.arriveAndAwaitAdvance(); - phaser.arriveAndAwaitAdvance(); - assertEquals(List.of(TIMEOUT, TIMEOUT), errors); - assertEquals(List.of(doc2), received); - } - - @Test - public void testCallback() { - AtomicBoolean called = new AtomicBoolean(); - executor.get(doc1.getId(), parameters().withResponseHandler(__ -> called.set(true)), operationContext()); - assertTrue(called.get()); - assertEquals(List.of(), messages); - assertEquals(List.of(), errors); - assertEquals(List.of(), received); - } - - @Test - public void testVisit() throws InterruptedException { - executor.put(new DocumentPut(doc1), parameters(), operationContext()); - executor.put(new DocumentPut(doc2), parameters(), operationContext()); - executor.put(new DocumentPut(doc3), parameters(), operationContext()); - assertEquals(doc1, received.remove(0)); - assertEquals(doc2, received.remove(0)); - assertEquals(doc3, received.remove(0)); - - // No cluster or document type set. - executor.visit(VisitorOptions.builder() - .build(), - visitContext()); - assertEquals("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level", messages.remove(0)); - assertEquals(BAD_REQUEST, errors.remove(0)); - assertEquals(List.of(), received); - - // Cluster not found. - executor.visit(VisitorOptions.builder() - .cluster("blargh") - .build(), - visitContext()); - assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", messages.remove(0)); - assertEquals(BAD_REQUEST, errors.remove(0)); - assertEquals(List.of(), received); - - // Matches doc2 for user 1. - executor.visit(VisitorOptions.builder() - .cluster("content") - .group(Group.of(1)) - .build(), - visitContext()); - for (VisitorControlHandler session : executor.visitorSessions()) { - session.waitUntilDone(); - } - assertEquals(List.of(), messages); - assertEquals(List.of(), errors); - assertEquals(doc2, received.remove(0)); - - // Matches documents in namespace ns of type music in group a. - executor.visit(VisitorOptions.builder() - .concurrency(2) - .wantedDocumentCount(3) - .namespace("ns") - .documentType("music") - .fieldSet("music:artist") - .group(Group.of("a")) - .build(), - visitContext()); - for (VisitorControlHandler session : executor.visitorSessions()) - session.waitUntilDone(); - assertEquals(List.of(), messages); - assertEquals(List.of(), errors); - assertEquals(doc3, received.remove(0)); - - // Matches documents with non-empty artist field. - executor.visit(VisitorOptions.builder() - .cluster("content") - .selection("music.artist") - .fieldSet("[id]") - .build(), - visitContext()); - for (VisitorControlHandler session : executor.visitorSessions()) - session.waitUntilDone(); - assertEquals(List.of(), messages); - assertEquals(List.of(), errors); - assertEquals(List.of(doc1.getId(), doc2.getId()), List.of(received.remove(0).getId(), received.remove(0).getId())); - - // Matches all documents, but we'll shut down midway. - Phaser phaser = new Phaser(1); - access.setPhaser(phaser); - executor.visit(VisitorOptions.builder() - .cluster("content") - .bucketSpace("global") - .build(), - visitContext()); - phaser.arriveAndAwaitAdvance(); // First document pending - CountDownLatch latch = new CountDownLatch(1); - Thread shutdownThread = new Thread(() -> { - executor.shutdown(); - latch.countDown(); - }); - shutdownThread.start(); - clock.advance(Duration.ofMillis(100)); - executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly. - latch.await(); // Make sure visit session is shut down before next document is considered. - phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above. - for (VisitorControlHandler session : executor.visitorSessions()) { - session.waitUntilDone(); - } - assertEquals(List.of(), messages); - assertEquals(List.of(), errors); - assertEquals(List.of(doc1), received); - } - - @Test - public void testDelayQueue() throws ExecutionException, InterruptedException, TimeoutException { - Supplier<Result> nullOperation = () -> null; - AtomicLong counter1 = new AtomicLong(0); - AtomicLong counter2 = new AtomicLong(0); - AtomicLong counter3 = new AtomicLong(0); - AtomicBoolean throttle = new AtomicBoolean(true); - OperationContext context1 = new OperationContext((type, message) -> counter1.decrementAndGet(), doc -> counter1.incrementAndGet()); - OperationContext context2 = new OperationContext((type, message) -> counter2.decrementAndGet(), doc -> counter2.incrementAndGet()); - OperationContext context3 = new OperationContext((type, message) -> counter3.decrementAndGet(), doc -> counter3.incrementAndGet()); - DelayQueue queue = new DelayQueue(3, - (operation, context) -> { - if (throttle.get()) - return false; - - context.success(Optional.empty()); - return true; - }, - Duration.ofMillis(30), - clock, - "test"); - synchronized (queue) { queue.notify(); queue.wait(); } // Make sure maintainers have gone to wait before test starts. - - // Add three operations: - // the first shall be handled by the queue on second attempt, - // the second by an external call,and - // the third during shutdown — added later. - assertTrue(queue.add(nullOperation, context1)); - clock.advance(Duration.ofMillis(20)); - assertTrue(queue.add(nullOperation, context2)); - assertTrue(queue.add(nullOperation, context3)); - assertFalse("New entries should be rejected by a full queue", queue.add(nullOperation, context3)); - assertEquals(3, queue.size()); - assertEquals(0, counter1.get()); - assertEquals(0, counter2.get()); - assertEquals(0, counter3.get()); - - context2.error(ERROR, "error"); // Marks this as handled, ready to be evicted. - synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer does not run yet, as it's not yet time. - assertEquals(0, counter1.get()); - assertEquals(-1, counter2.get()); - assertEquals(0, counter3.get()); - assertEquals(3, queue.size()); - - clock.advance(Duration.ofMillis(15)); - synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer now runs, failing to handle first and evicting second entry. - assertEquals(0, counter1.get()); - assertEquals(-1, counter2.get()); - assertEquals(0, counter3.get()); - assertEquals(2, queue.size()); - - throttle.set(false); - clock.advance(Duration.ofMillis(15)); - synchronized (queue) { queue.notify(); queue.wait(); } // Maintainer runs again, successfully handling first entry. - assertEquals(1, counter1.get()); - assertEquals(-1, counter2.get()); - assertEquals(0, counter3.get()); - assertEquals(1, queue.size()); - - queue.shutdown(Duration.ZERO, context -> context.error(ERROR, "shutdown")) - .get(1, TimeUnit.SECONDS); - assertEquals(1, counter1.get()); - assertEquals(-1, counter2.get()); - assertEquals(-1, counter3.get()); - assertEquals(0, queue.size()); - } - -} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index c6b611ed3c7..6de21e82524 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -1,31 +1,55 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.restapi.resource; +import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.container.jdisc.RequestHandlerTestDriver; import com.yahoo.docproc.jdisc.metric.NullMetric; import com.yahoo.document.Document; -import com.yahoo.document.DocumentGet; +import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentRemove; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.datatypes.StringFieldValue; -import com.yahoo.document.restapi.DocumentOperationExecutor.Group; -import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; -import com.yahoo.document.restapi.DocumentOperationExecutorMock; -import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.DocumentOperationParser; +import com.yahoo.document.restapi.DocumentOperationExecutorConfig; +import com.yahoo.document.restapi.resource.DocumentV1ApiHandler.StorageCluster; import com.yahoo.document.update.FieldUpdate; +import com.yahoo.documentapi.AckToken; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; +import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentAccessParams; -import com.yahoo.documentapi.local.LocalDocumentAccess; +import com.yahoo.documentapi.DocumentIdResponse; +import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.DocumentResponse; +import com.yahoo.documentapi.DumpVisitorDataHandler; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.SubscriptionParameters; +import com.yahoo.documentapi.SubscriptionSession; +import com.yahoo.documentapi.SyncParameters; +import com.yahoo.documentapi.SyncSession; +import com.yahoo.documentapi.UpdateResponse; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorDestinationParameters; +import com.yahoo.documentapi.VisitorDestinationSession; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorResponse; +import com.yahoo.documentapi.VisitorSession; import com.yahoo.jdisc.Metric; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.Trace; import com.yahoo.metrics.simple.MetricReceiver; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.slime.Inspector; import com.yahoo.slime.JsonFormat; import com.yahoo.slime.SlimeUtils; import com.yahoo.test.ManualClock; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -33,14 +57,16 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.INSUFFICIENT_STORAGE; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; import static com.yahoo.documentapi.DocumentOperationParameters.parameters; import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS; @@ -50,12 +76,28 @@ import static com.yahoo.jdisc.http.HttpRequest.Method.PUT; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author jonmv */ public class DocumentV1ApiTest { + final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder() + .cluster("content", + new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", + new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace(FixedBucketSpaces.defaultSpace()))) + .build(); + final ClusterListConfig clusterConfig = new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder().configid("config-id") + .name("content")) + .build(); + final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder() + .maxThrottled(2) + .resendDelayMillis(1 << 30) + .build(); final DocumentmanagerConfig docConfig = Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build(); final DocumentTypeManager manager = new DocumentTypeManager(docConfig); final Document doc1 = new Document(manager.getDocumentType("music"), "id:space:music::one"); @@ -66,10 +108,11 @@ public class DocumentV1ApiTest { doc2.setFieldValue("artist", "Asa-Chan & Jun-Ray"); } + final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", + "config-id", + Map.of("music", "default"))); ManualClock clock; - DocumentOperationParser parser; - LocalDocumentAccess access; - DocumentOperationExecutorMock executor; + MockDocumentAccess access; Metric metric; MetricReceiver metrics; DocumentV1ApiHandler handler; @@ -77,12 +120,10 @@ public class DocumentV1ApiTest { @Before public void setUp() { clock = new ManualClock(); - parser = new DocumentOperationParser(docConfig); - access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(docConfig)); - executor = new DocumentOperationExecutorMock(); + access = new MockDocumentAccess(docConfig); metric = new NullMetric(); metrics = new MetricReceiver.MockReceiver(); - handler = new DocumentV1ApiHandler(clock, executor, parser, metric, metrics); + handler = new DocumentV1ApiHandler(clock, metric, metrics, access, docConfig, executorConfig, clusterConfig, bucketConfig); } @After @@ -91,32 +132,78 @@ public class DocumentV1ApiTest { } @Test + public void testResolveCluster() { + assertEquals("content", + DocumentV1ApiHandler.resolveCluster(Optional.empty(), clusters).name()); + assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", + DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters).route()); + try { + DocumentV1ApiHandler.resolveCluster(Optional.empty(), Map.of()); + fail("Should fail without any clusters"); + } + catch (IllegalArgumentException e) { + assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage()); + } + try { + DocumentV1ApiHandler.resolveCluster(Optional.of("blargh"), clusters); + fail("Should not find this cluster"); + } + catch (IllegalArgumentException e) { + assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage()); + } + try { + Map<String, StorageCluster> twoClusters = new TreeMap<>(); + twoClusters.put("one", new StorageCluster("one", "one-config", Map.of())); + twoClusters.put("two", new StorageCluster("two", "two-config", Map.of())); + DocumentV1ApiHandler.resolveCluster(Optional.empty(), twoClusters); + fail("More than one cluster and no document type should fail"); + } + catch (IllegalArgumentException e) { + assertEquals("Please specify one of the content clusters in your Vespa deployment: 'one', 'two'", e.getMessage()); + } + StorageCluster cluster = DocumentV1ApiHandler.resolveCluster(Optional.of("content"), clusters); + assertEquals(FixedBucketSpaces.defaultSpace(), + DocumentV1ApiHandler.resolveBucket(cluster, Optional.of("music"), List.of(), Optional.empty())); + assertEquals(FixedBucketSpaces.globalSpace(), + DocumentV1ApiHandler.resolveBucket(cluster, Optional.empty(), List.of(FixedBucketSpaces.globalSpace()), Optional.of("global"))); + } + + @Test public void testResponses() { + Executor visitCompleter = Executors.newSingleThreadExecutor(); RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); // GET at non-existent path returns 404 with available paths var response = driver.sendRequest("http://localhost/document/v1/not-found"); assertSameJson("{" + " \"pathId\": \"/document/v1/not-found\"," + " \"message\": \"Nothing at '/document/v1/not-found'. Available paths are:\\n" + - "/document/v1/\\n" + - "/document/v1/{namespace}/{documentType}/docid/\\n" + - "/document/v1/{namespace}/{documentType}/group/{group}/\\n" + - "/document/v1/{namespace}/{documentType}/number/{number}/\\n" + - "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" + - "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" + - "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" + - "}", - response.readAll()); + "/document/v1/\\n" + + "/document/v1/{namespace}/{documentType}/docid/\\n" + + "/document/v1/{namespace}/{documentType}/group/{group}/\\n" + + "/document/v1/{namespace}/{documentType}/number/{number}/\\n" + + "/document/v1/{namespace}/{documentType}/docid/{docid}\\n" + + "/document/v1/{namespace}/{documentType}/group/{group}/{docid}\\n" + + "/document/v1/{namespace}/{documentType}/number/{number}/{docid}\"" + + "}", response.readAll()); assertEquals("application/json; charset=UTF-8", response.getResponse().headers().getFirst("Content-Type")); assertEquals(404, response.getStatus()); // GET at root is a visit. Numeric parameters have an upper bound. - response = driver.sendRequest("http://localhost/document/v1?cluster=lackluster&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + - "&selection=all%20the%20things&fieldSet=[id]&continuation=token"); - executor.lastVisitContext().document(doc1); - executor.lastVisitContext().document(doc2); - executor.lastVisitContext().document(doc3); - executor.lastVisitContext().success(Optional.of("token")); + access.expect(parameters -> { + assertEquals("[Storage:cluster=content;clusterconfigid=config-id]", parameters.getRoute().toString()); + assertEquals("default", parameters.getBucketSpace()); + assertEquals(1024, parameters.getMaxTotalHits()); + assertEquals(100, ((StaticThrottlePolicy) parameters.getThrottlePolicy()).getMaxPendingCount()); + assertEquals("[id]", parameters.getFieldSet()); + assertEquals("(all the things)", parameters.getDocumentSelection()); + // Put some documents in the response + ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0); + ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0); + ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0); + visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "message")); + }); + response = driver.sendRequest("http://localhost/document/v1?cluster=content&bucketSpace=default&wantedDocumentCount=1025&concurrency=123" + + "&selection=all%20the%20things&fieldSet=[id]"); assertSameJson("{" + " \"pathId\": \"/document/v1\"," + " \"documents\": [" + @@ -136,125 +223,129 @@ public class DocumentV1ApiTest { " \"id\": \"id:space:music:g=a:three\"," + " \"fields\": {}" + " }" + - " ]," + - " \"continuation\": \"token\"" + - "}", - response.readAll()); + " ]" + + "}", response.readAll()); assertEquals(200, response.getStatus()); - assertEquals(VisitorOptions.builder().cluster("lackluster").bucketSpace("default").wantedDocumentCount(1024) - .concurrency(100).selection("all the things").fieldSet("[id]").continuation("token").build(), - executor.lastOptions()); // GET with namespace and document type is a restricted visit. - response = driver.sendRequest("http://localhost/document/v1/space/music/docid"); - executor.lastVisitContext().error(BAD_REQUEST, "nope"); + access.expect(parameters -> { + assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection()); + throw new IllegalArgumentException("parse failure"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString()); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid\"," + - " \"documents\": []," + - " \"message\": \"nope\"" + - "}", - response.readAll()); + " \"message\": \"parse failure\"" + + "}", response.readAll()); assertEquals(400, response.getStatus()); - assertEquals(VisitorOptions.builder().namespace("space").documentType("music").build(), - executor.lastOptions()); // GET with namespace, document type and group is a restricted visit. - response = driver.sendRequest("http://localhost/document/v1/space/music/group/best"); - executor.lastVisitContext().error(ERROR, "error"); + access.expect(parameters -> { + assertEquals("(music) and (id.namespace=='space') and (id.group=='best\\'')", parameters.getDocumentSelection()); + visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.FAILURE, "error")); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/group/best%27"); assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/group/best\"," + + " \"pathId\": \"/document/v1/space/music/group/best%27\"," + " \"documents\": []," + " \"message\": \"error\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(500, response.getStatus()); - assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of("best")).build(), - executor.lastOptions()); // GET with namespace, document type and number is a restricted visit. + access.expect(parameters -> { + assertEquals("(music) and (id.namespace=='space') and (id.user==123)", parameters.getDocumentSelection()); + visitCompleter.execute(() -> parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "aborted")); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/123"); - executor.lastVisitContext().success(Optional.empty()); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/123\"," + " \"documents\": []" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(200, response.getStatus()); - assertEquals(VisitorOptions.builder().namespace("space").documentType("music").group(Group.of(123)).build(), - executor.lastOptions()); // GET with full document ID is a document get operation which returns 404 when no document is found - response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=lackluster&fieldSet=go"); - executor.lastOperationContext().success(Optional.empty()); + access.session.expect((id, parameters) -> { + assertEquals(doc1.getId(), id); + assertEquals(parameters().withRoute("[Storage:cluster=content;clusterconfigid=config-id]").withFieldSet("go"), parameters); + parameters.responseHandler().get().handleResponse(new DocumentResponse(0, null)); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=content&fieldSet=go"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid/one\"," + " \"id\": \"id:space:music::one\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(404, response.getStatus()); - assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation()); - assertEquals(parameters().withRoute("route-to-lackluster").withFieldSet("go"), executor.lastParameters()); // GET with full document ID is a document get operation. + access.session.expect((id, parameters) -> { + assertEquals(doc1.getId(), id); + assertEquals(parameters(), parameters); + parameters.responseHandler().get().handleResponse(new DocumentResponse(0, doc1)); + return new Result(Result.ResultType.SUCCESS, null); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?"); - executor.lastOperationContext().success(Optional.of(doc1)); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid/one\"," + " \"id\": \"id:space:music::one\"," + " \"fields\": {" + " \"artist\": \"Tom Waits\"" + " }" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(200, response.getStatus()); - assertEquals(new DocumentGet(doc1.getId()), executor.lastOperation()); - assertEquals(parameters(), executor.lastParameters()); // GET with not encoded / in user specified part of document id is a 404 + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/docid/one/two/three"); response.readAll(); // Must drain body. assertEquals(404, response.getStatus()); // POST with a document payload is a document put operation. + access.session.expect((put, parameters) -> { + DocumentPut expectedPut = new DocumentPut(doc2); + expectedPut.setCondition(new TestAndSetCondition("test it")); + assertEquals(expectedPut, put); + assertEquals(parameters(), parameters); + parameters.responseHandler().get().handleResponse(new DocumentResponse(0, doc2)); + return new Result(Result.ResultType.SUCCESS, null); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST, "{" + " \"fields\": {" + " \"artist\": \"Asa-Chan & Jun-Ray\"" + " }" + "}"); - executor.lastOperationContext().success(Optional.empty()); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"id\": \"id:space:music:n=1:two\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(200, response.getStatus()); - DocumentPut put = new DocumentPut(doc2); - put.setCondition(new TestAndSetCondition("test it")); - assertEquals(put, executor.lastOperation()); - assertEquals(parameters(), executor.lastParameters()); // PUT with a document update payload is a document update operation. + access.session.expect((update, parameters) -> { + DocumentUpdate expectedUpdate = new DocumentUpdate(doc3.getDataType(), doc3.getId()); + expectedUpdate.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl"))); + expectedUpdate.setCreateIfNonExistent(true); + assertEquals(expectedUpdate, update); + assertEquals(parameters(), parameters); + parameters.responseHandler().get().handleResponse(new UpdateResponse(0, true)); + return new Result(Result.ResultType.SUCCESS, null); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/group/a/three?create=true", PUT, "{" + " \"fields\": {" + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + " }" + "}"); - executor.lastOperationContext().success(Optional.empty()); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/group/a/three\"," + " \"id\": \"id:space:music:g=a:three\"" + - "}", - response.readAll()); - DocumentUpdate update = new DocumentUpdate(doc3.getDataType(), doc3.getId()); - update.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl"))); - update.setCreateIfNonExistent(true); - assertEquals(update, executor.lastOperation()); - assertEquals(parameters(), executor.lastParameters()); + "}", response.readAll()); assertEquals(200, response.getStatus()); // POST with illegal payload is a 400 + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?condition=test%20it", POST, "{" + " ┻━┻︵ \\(°□°)/ ︵ ┻━┻" + @@ -265,6 +356,7 @@ public class DocumentV1ApiTest { assertEquals(400, response.getStatus()); // PUT on a unknown document type is a 400 + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("http://localhost/document/v1/space/house/group/a/three?create=true", PUT, "{" + " \"fields\": {" + @@ -274,101 +366,229 @@ public class DocumentV1ApiTest { assertSameJson("{" + " \"pathId\": \"/document/v1/space/house/group/a/three\"," + " \"message\": \"Document type house does not exist\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(400, response.getStatus()); // DELETE with full document ID is a document remove operation. - response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route", DELETE); - executor.lastOperationContext().success(Optional.empty()); + access.session.expect((remove, parameters) -> { + DocumentRemove expectedRemove = new DocumentRemove(doc2.getId()); + expectedRemove.setCondition(new TestAndSetCondition("false")); + assertEquals(new DocumentRemove(doc2.getId()), remove); + assertEquals(parameters.withRoute("route"), parameters); + parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId())); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?route=route&condition=false", DELETE); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"id\": \"id:space:music:n=1:two\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(200, response.getStatus()); - assertEquals(new DocumentRemove(doc2.getId()), executor.lastOperation()); - assertEquals(parameters().withRoute("route"), executor.lastParameters()); // GET with non-existent cluster is a 400 + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two?cluster=throw-me"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"message\": \"throw-me\"" + - "}", - response.readAll()); + " \"message\": \"Your Vespa deployment has no content cluster 'throw-me', only 'content'\"" + + "}", response.readAll()); assertEquals(400, response.getStatus()); - // TIMEOUT is a 504 + // INSUFFICIENT_STORAGE is a 507 + access.session.expect((id, parameters) -> { + parameters.responseHandler().get().handleResponse(new Response(0, "disk full", Response.Outcome.INSUFFICIENT_STORAGE)); + return new Result(Result.ResultType.SUCCESS, null); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); - executor.lastOperationContext().error(TIMEOUT, "timeout"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"id\": \"id:space:music:n=1:two\"," + - " \"message\": \"timeout\"" + - "}", - response.readAll()); - assertEquals(504, response.getStatus()); - - // INSUFFICIENT_STORAGE is a 504 - response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); - executor.lastOperationContext().error(INSUFFICIENT_STORAGE, "disk full"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"id\": \"id:space:music:n=1:two\"," + " \"message\": \"disk full\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(507, response.getStatus()); - // OVERLOAD is a 429 - response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); - executor.lastOperationContext().error(OVERLOAD, "overload"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/number/1/two\"," + - " \"id\": \"id:space:music:n=1:two\"," + - " \"message\": \"overload\"" + - "}", - response.readAll()); - assertEquals(429, response.getStatus()); - // PRECONDITION_FAILED is a 412 + access.session.expect((id, parameters) -> { + parameters.responseHandler().get().handleResponse(new Response(0, "no dice", Response.Outcome.CONDITION_FAILED)); + return new Result(Result.ResultType.SUCCESS, null); + }); response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); - executor.lastOperationContext().error(PRECONDITION_FAILED, "no dice"); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + " \"id\": \"id:space:music:n=1:two\"," + " \"message\": \"no dice\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(412, response.getStatus()); - // Client close during processing gives empty body - response = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); - response.clientClose(); - executor.lastOperationContext().error(TIMEOUT, "no dice"); - assertEquals("", response.readAll()); - assertEquals(504, response.getStatus()); - // OPTIONS gets options + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", OPTIONS); assertEquals("", response.readAll()); assertEquals(204, response.getStatus()); assertEquals("GET,POST,PUT,DELETE", response.getResponse().headers().getFirst("Allow")); // PATCH is not allowed + access.session.expect((__, ___) -> { throw new AssertionError("Not supposed to happen"); }); response = driver.sendRequest("https://localhost/document/v1/space/music/docid/one", PATCH); assertSameJson("{" + " \"pathId\": \"/document/v1/space/music/docid/one\"," + " \"message\": \"'PATCH' not allowed at '/document/v1/space/music/docid/one'. Allowed methods are: GET, POST, PUT, DELETE\"" + - "}", - response.readAll()); + "}", response.readAll()); assertEquals(405, response.getStatus()); + // OVERLOAD is a 429 + access.session.expect((id, parameters) -> new Result(Result.ResultType.TRANSIENT_ERROR, new Error("overload"))); + var response1 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + var response2 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + var response3 = driver.sendRequest("http://localhost/document/v1/space/music/number/1/two"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"Rejecting execution due to overload: 2 requests already enqueued\"" + + "}", response3.readAll()); + assertEquals(429, response3.getStatus()); + access.session.expect((id, parameters) -> new Result(Result.ResultType.FATAL_ERROR, new Error("error"))); + handler.dispatchEnqueued(); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"error\"" + + "}", response1.readAll()); + assertEquals(500, response1.getStatus()); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/number/1/two\"," + + " \"message\": \"error\"" + + "}", response2.readAll()); + assertEquals(500, response2.getStatus()); + driver.close(); } - void assertSameJson(String expected, String actual) { + + static class MockDocumentAccess extends DocumentAccess { + + private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>(); + private final MockAsyncSession session = new MockAsyncSession(); + + MockDocumentAccess(DocumentmanagerConfig config) { + super(new DocumentAccessParams().setDocumentmanagerConfig(config)); + } + + @Override + public SyncSession createSyncSession(SyncParameters parameters) { + throw new AssertionError("Not used"); + } + + @Override + public AsyncSession createAsyncSession(AsyncParameters parameters) { + return session; + } + + @Override + public VisitorSession createVisitorSession(VisitorParameters parameters) { + expectations.get().accept(parameters); + return new VisitorSession() { + @Override public boolean isDone() { return false; } + @Override public ProgressToken getProgress() { return null; } + @Override public Trace getTrace() { return null; } + @Override public boolean waitUntilDone(long timeoutMs) { return false; } + @Override public void ack(AckToken token) { } + @Override public void abort() { } + @Override public VisitorResponse getNext() { return null; } + @Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; } + @Override public void destroy() { } + }; + } + + @Override + public VisitorDestinationSession createVisitorDestinationSession(VisitorDestinationParameters parameters) { + throw new AssertionError("Not used"); + } + + @Override + public SubscriptionSession createSubscription(SubscriptionParameters parameters) { + throw new AssertionError("Not used"); + } + + @Override + public SubscriptionSession openSubscription(SubscriptionParameters parameters) { + throw new AssertionError("Not used"); + } + + public void expect(Consumer<VisitorParameters> expectations) { + this.expectations.set(expectations); + } + + } + + + static class MockAsyncSession implements AsyncSession { + + private final AtomicReference<BiFunction<Object, DocumentOperationParameters, Result>> expectations = new AtomicReference<>(); + + @Override + public Result put(Document document) { + throw new AssertionError("Not used"); + } + + @Override + public Result put(DocumentPut documentPut, DocumentOperationParameters parameters) { + return expectations.get().apply(documentPut, parameters); + } + + @Override + public Result get(DocumentId id) { + throw new AssertionError("Not used"); + } + + @Override + public Result get(DocumentId id, DocumentOperationParameters parameters) { + return expectations.get().apply(id, parameters); + } + + @Override + public Result remove(DocumentId id) { + throw new AssertionError("Not used"); + } + + @Override + public Result remove(DocumentRemove remove, DocumentOperationParameters parameters) { + return expectations.get().apply(remove, parameters); + } + + @Override + public Result update(DocumentUpdate update) { + throw new AssertionError("Not used"); + } + + @Override + public Result update(DocumentUpdate update, DocumentOperationParameters parameters) { + return expectations.get().apply(update, parameters); + } + + @Override + public double getCurrentWindowSize() { + throw new AssertionError("Not used"); + } + + public void expect(BiFunction<Object, DocumentOperationParameters, Result> expectations) { + this.expectations.set(expectations); + } + + @Override + public Response getNext() { + throw new AssertionError("Not used"); + } + + @Override + public Response getNext(int timeoutMilliseconds) { + throw new AssertionError("Not used"); + } + + @Override + public void destroy() { } + + } + + static void assertSameJson(String expected, String actual) { ByteArrayOutputStream expectedPretty = new ByteArrayOutputStream(); ByteArrayOutputStream actualPretty = new ByteArrayOutputStream(); JsonFormat formatter = new JsonFormat(false); |