summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-29 19:16:54 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:35 +0200
commit93f607279f17812125f95ee6c3203495c5264106 (patch)
tree46ab48569a5dc8ff0cc729874c1d60d84c997080 /vespaclient-container-plugin
parent96c27a8107d572f624018f367a369989efa74f84 (diff)
Separate out interface for DocumentOperationExecutor
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java620
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java515
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java5
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java82
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java60
5 files changed, 748 insertions, 534 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
index fb9a6eb5873..8b0c966c46f 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java
@@ -1,195 +1,53 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;
-import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.fieldset.AllFields;
-import com.yahoo.document.select.parser.ParseException;
-import com.yahoo.documentapi.AsyncParameters;
-import com.yahoo.documentapi.AsyncSession;
-import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
-import com.yahoo.documentapi.DocumentResponse;
-import com.yahoo.documentapi.DumpVisitorDataHandler;
import com.yahoo.documentapi.ProgressToken;
-import com.yahoo.documentapi.Response;
-import com.yahoo.documentapi.Result;
-import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorParameters;
-import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.text.Text;
-import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
-import com.yahoo.yolean.Exceptions;
-import java.time.Clock;
import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import java.util.function.Supplier;
-import java.util.logging.Logger;
import java.util.stream.Stream;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
-import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
-import static java.util.Objects.requireNonNull;
-import static java.util.logging.Level.SEVERE;
-import static java.util.logging.Level.WARNING;
-import static java.util.stream.Collectors.toMap;
-import static java.util.stream.Collectors.toUnmodifiableMap;
-
/**
- * Encapsulates a document access and supports running asynchronous document
- * operations and visits against this, with retries and optional timeouts.
+ * Wraps the document API with an executor that can retry and time out document operations,
+ * as well as compute the required visitor parameters for visitor sessions.
*
* @author jonmv
*/
-public class DocumentOperationExecutor {
-
- private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName());
-
- private final Duration visitTimeout;
- private final long maxThrottled;
- private final DocumentAccess access;
- private final AsyncSession asyncSession;
- private final Map<String, StorageCluster> clusters;
- private final Clock clock;
- private final DelayQueue throttled;
- private final DelayQueue timeouts;
- private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
- private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
-
- public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
- DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
- this(Duration.ofMillis(executorConfig.resendDelayMillis()),
- Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
- Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
- executorConfig.maxThrottled(),
- access,
- parseClusters(clustersConfig, bucketsConfig),
- clock);
- }
+public interface DocumentOperationExecutor {
- DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
- DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
- this.visitTimeout = requireNonNull(visitTimeout);
- this.maxThrottled = maxThrottled;
- this.access = requireNonNull(access);
- this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
- this.clock = requireNonNull(clock);
- this.clusters = Map.copyOf(clusters);
- this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
- this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
- }
+ default void shutdown() { }
- /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
- public void shutdown() {
- long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
- visits.values().forEach(VisitorSession::destroy);
- Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
- context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
- Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
- context -> context.error(TIMEOUT, "Timed out due to shutdown"));
- try {
- throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException | ExecutionException | TimeoutException e) {
- throttleShutdown.cancel(true);
- throttleShutdown.cancel(true);
- log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
- }
- }
+ void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
- public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.get(id, parameters), context);
- }
+ void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context);
- public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.put(put, parameters), context);
- }
+ void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context);
- public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.update(update, parameters), context);
- }
+ void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context);
- public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
- accept(() -> asyncSession.remove(id, parameters), context);
- }
-
- public void visit(VisitorOptions options, VisitOperationsContext context) {
- try {
- AtomicBoolean done = new AtomicBoolean(false);
- VisitorParameters parameters = options.asParameters(clusters, visitTimeout);
- parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
- @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
- @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
- });
- parameters.setControlHandler(new VisitorControlHandler() {
- @Override public void onDone(CompletionCode code, String message) {
- super.onDone(code, message);
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets())
- context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
- case SUCCESS: // intentional fallthrough
- case ABORTED:
- context.success(Optional.ofNullable(getProgress())
- .filter(progress -> ! progress.isFinished())
- .map(ProgressToken::serializeToString));
- break;
- default:
- context.error(ERROR, message != null ? message : "Visiting failed");
- }
- done.set(true); // This may be reached before dispatching thread is done putting us in the map.
- visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
- }
- });
- visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- if (done.get())
- visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
- }
- catch (IllegalArgumentException | ParseException e) {
- context.error(BAD_REQUEST, Exceptions.toMessageString(e));
- }
- catch (RuntimeException e) {
- context.error(ERROR, Exceptions.toMessageString(e));
- }
- }
-
- public String routeToCluster(String cluster) {
- return resolveCluster(Optional.of(cluster), clusters).route();
- }
+ void visit(VisitorOptions options, VisitOperationsContext context);
+ String routeToCluster(String cluster);
- public enum ErrorType {
+ enum ErrorType {
OVERLOAD,
NOT_FOUND,
PRECONDITION_FAILED,
@@ -199,8 +57,37 @@ public class DocumentOperationExecutor {
}
+ /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
+ class Context<T> {
+
+ private final AtomicBoolean handled = new AtomicBoolean();
+ private final BiConsumer<ErrorType, String> onError;
+ private final Consumer<T> onSuccess;
+
+ Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
+ this.onError = onError;
+ this.onSuccess = onSuccess;
+ }
+
+ public void error(ErrorType type, String message) {
+ if ( ! handled.getAndSet(true))
+ onError.accept(type, message);
+ }
+
+ public void success(T result) {
+ if ( ! handled.getAndSet(true))
+ onSuccess.accept(result);
+ }
+
+ public boolean handled() {
+ return handled.get();
+ }
+
+ }
+
+
/** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */
- public static class VisitOperationsContext extends Context<Optional<String>> {
+ class VisitOperationsContext extends Context<Optional<String>> {
private final Consumer<Document> onDocument;
@@ -209,7 +96,7 @@ public class DocumentOperationExecutor {
this.onDocument = onDocument;
}
- void document(Document document) {
+ public void document(Document document) {
if ( ! handled())
onDocument.accept(document);
}
@@ -218,7 +105,7 @@ public class DocumentOperationExecutor {
/** Context for a document operation. */
- public static class OperationContext extends Context<Optional<Document>> {
+ class OperationContext extends Context<Optional<Document>> {
public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) {
super(onError, onSuccess);
@@ -227,22 +114,22 @@ public class DocumentOperationExecutor {
}
- public static class VisitorOptions {
+ class VisitorOptions {
- private final Optional<String> cluster;
- private final Optional<String> namespace;
- private final Optional<String> documentType;
- private final Optional<Group> group;
- private final Optional<String> selection;
- private final Optional<String> fieldSet;
- private final Optional<String> continuation;
- private final Optional<String> bucketSpace;
- private final Optional<Integer> wantedDocumentCount;
- private final Optional<Integer> concurrency;
+ final Optional<String> cluster;
+ final Optional<String> namespace;
+ final Optional<String> documentType;
+ final Optional<Group> group;
+ final Optional<String> selection;
+ final Optional<String> fieldSet;
+ final Optional<String> continuation;
+ final Optional<String> bucketSpace;
+ final Optional<Integer> wantedDocumentCount;
+ final Optional<Integer> concurrency;
private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
- Optional<String> continuation,Optional<String> bucketSpace,
+ Optional<String> continuation, Optional<String> bucketSpace,
Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
this.cluster = cluster;
this.namespace = namespace;
@@ -256,38 +143,42 @@ public class DocumentOperationExecutor {
this.concurrency = concurrency;
}
- private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) {
- if (cluster.isEmpty() && documentType.isEmpty())
- throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
-
- VisitorParameters parameters = new VisitorParameters(Stream.of(selection,
- documentType,
- namespace.map(value -> "id.namespace=='" + value + "'"),
- group.map(Group::selection))
- .flatMap(Optional::stream)
- .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
- StringJoiner::add,
- StringJoiner::merge)
- .toString());
-
- continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
- parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
- wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
- parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10));
- concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
- parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1)));
- parameters.setTimeoutMs(visitTimeout.toMillis());
- parameters.visitInconsistentBuckets(true);
- parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
-
- StorageCluster storageCluster = resolveCluster(cluster, clusters);
- parameters.setRoute(storageCluster.route());
- parameters.setBucketSpace(resolveBucket(storageCluster,
- documentType,
- List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
- bucketSpace));
-
- return parameters;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VisitorOptions that = (VisitorOptions) o;
+ return cluster.equals(that.cluster) &&
+ namespace.equals(that.namespace) &&
+ documentType.equals(that.documentType) &&
+ group.equals(that.group) &&
+ selection.equals(that.selection) &&
+ fieldSet.equals(that.fieldSet) &&
+ continuation.equals(that.continuation) &&
+ bucketSpace.equals(that.bucketSpace) &&
+ wantedDocumentCount.equals(that.wantedDocumentCount) &&
+ concurrency.equals(that.concurrency);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency);
+ }
+
+ @Override
+ public String toString() {
+ return "VisitorOptions{" +
+ "cluster=" + cluster +
+ ", namespace=" + namespace +
+ ", documentType=" + documentType +
+ ", group=" + group +
+ ", selection=" + selection +
+ ", fieldSet=" + fieldSet +
+ ", continuation=" + continuation +
+ ", bucketSpace=" + bucketSpace +
+ ", wantedDocumentCount=" + wantedDocumentCount +
+ ", concurrency=" + concurrency +
+ '}';
}
public static Builder builder() { return new Builder(); }
@@ -369,7 +260,7 @@ public class DocumentOperationExecutor {
}
- public static class Group {
+ class Group {
private final String value;
private final String docIdPart;
@@ -390,319 +281,30 @@ public class DocumentOperationExecutor {
public String docIdPart() { return docIdPart; }
public String selection() { return selection; }
- }
-
-
- /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
- private void accept(Supplier<Result> operation, OperationContext context) {
- timeouts.add(operation, context);
- send(operation, context);
- }
-
- /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
- private void send(Supplier<Result> operation, OperationContext context) {
- Result result = operation.get();
- switch (result.type()) {
- case SUCCESS:
- outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
- break;
- case TRANSIENT_ERROR:
- if ( ! throttled.add(operation, context))
- context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
- break;
- default:
- log.log(WARNING, "Unknown result type '" + result.type() + "'");
- case FATAL_ERROR: // intentional fallthrough
- context.error(ERROR, result.getError().getMessage());
- }
- }
-
- private void handle(Response response) {
- outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
- }
-
- private static ErrorType toErrorType(Response.Outcome outcome) {
- switch (outcome) {
- case NOT_FOUND:
- return NOT_FOUND;
- case CONDITION_FAILED:
- return PRECONDITION_FAILED;
- default:
- log.log(WARNING, "Unexpected response outcome: " + outcome);
- case ERROR: // intentional fallthrough
- return ERROR;
- }
- }
-
-
- /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */
- private static class Context<T> {
-
- private final AtomicBoolean handled = new AtomicBoolean();
- private final BiConsumer<ErrorType, String> onError;
- private final Consumer<T> onSuccess;
-
- Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) {
- this.onError = onError;
- this.onSuccess = onSuccess;
- }
-
- void error(ErrorType type, String message) {
- if ( ! handled.getAndSet(true))
- onError.accept(type, message);
- }
-
- void success(T result) {
- if ( ! handled.getAndSet(true))
- onSuccess.accept(result);
- }
-
- boolean handled() {
- return handled.get();
- }
-
- }
-
-
- private static class Completion {
-
- private final OperationContext context;
- private final Response response;
-
- private Completion(OperationContext context, Response response) {
- this.context = context;
- this.response = response;
- }
-
- static Completion of(OperationContext context) {
- return new Completion(requireNonNull(context), null);
- }
-
- static Completion of(Response response) {
- return new Completion(null, requireNonNull(response));
- }
-
- Completion merge(Completion other) {
- if (context == null)
- complete(other.context, response);
- else
- complete(context, other.response);
- return null;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Group group = (Group) o;
+ return value.equals(group.value) &&
+ docIdPart.equals(group.docIdPart) &&
+ selection.equals(group.selection);
}
- private void complete(OperationContext context, Response response) {
- if (response.isSuccess())
- context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
- : Optional.empty());
- else
- context.error(toErrorType(response.outcome()), response.getTextMessage());
- }
-
- }
-
-
- /**
- * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
- *
- * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
- * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
- */
- static class DelayQueue {
-
- private final long maxSize;
- private final Clock clock;
- private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
- private final AtomicLong size = new AtomicLong(0);
- private final Thread maintainer;
- private final Duration delay;
- private final long defaultWaitMillis;
-
- public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
- if (maxSize < 0)
- throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
- if (delay.isNegative())
- throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
-
- this.maxSize = maxSize;
- this.delay = delay;
- this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
- this.clock = requireNonNull(clock);
- this.maintainer = new Thread(() -> maintain(action));
- this.maintainer.start();
- }
-
- boolean add(Supplier<Result> operation, OperationContext context) {
- if (size.incrementAndGet() > maxSize) {
- size.decrementAndGet();
- return false;
- }
- return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, docIdPart, selection);
}
- long size() { return size.get(); }
-
- Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
- ExecutorService shutdownService = Executors.newSingleThreadExecutor();
- Future<?> future = shutdownService.submit(() -> {
- try {
- Thread.sleep(grace.toMillis());
- }
- finally {
- maintainer.interrupt();
- for (Delayed delayed; (delayed = queue.poll()) != null; ) {
- size.decrementAndGet();
- onShutdown.accept(delayed.context());
- }
- }
- return null;
- });
- shutdownService.shutdown();
- return future;
+ @Override
+ public String toString() {
+ return "Group{" +
+ "value='" + value + '\'' +
+ ", docIdPart='" + docIdPart + '\'' +
+ ", selection='" + selection + '\'' +
+ '}';
}
- /**
- * Repeatedly loops through the queue, evicting already handled entries and processing those
- * which have become ready since last time, then waits until new items are guaranteed to be ready,
- * or until it's time for a new run just to ensure GC of handled entries.
- * The entries are assumed to always be added to the back of the queue, with the same delay.
- * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
- * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
- */
- private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
- while ( ! Thread.currentThread().isInterrupted()) {
- try {
- Instant waitUntil = null;
- Iterator<Delayed> operations = queue.iterator();
- while (operations.hasNext()) {
- Delayed delayed = operations.next();
- // Already handled: remove and continue.
- if (delayed.context().handled()) {
- operations.remove();
- size.decrementAndGet();
- continue;
- }
- // Ready for action: remove from queue and run.
- if (delayed.readyAt().isBefore(clock.instant())) {
- operations.remove();
- size.decrementAndGet();
- action.accept(delayed.operation(), delayed.context());
- continue;
- }
- // Not yet ready for action: keep time to wake up again.
- waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
- }
- long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
- synchronized (this) {
- do {
- notify();
- wait(Math.max(0, waitUntilMillis - clock.millis()));
- }
- while (clock.millis() < waitUntilMillis);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- catch (Exception e) {
- log.log(SEVERE, "Exception caught by delay queue maintainer", e);
- }
- }
- }
- }
-
-
- private static class Delayed {
-
- private final Supplier<Result> operation;
- private final OperationContext context;
- private final Instant readyAt;
-
- Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
- this.readyAt = requireNonNull(readyAt);
- this.context = requireNonNull(context);
- this.operation = requireNonNull(operation);
- }
-
- Supplier<Result> operation() { return operation; }
- OperationContext context() { return context; }
- Instant readyAt() { return readyAt; }
-
- }
-
-
- static class StorageCluster {
-
- private final String name;
- private final String configId;
- private final Map<String, String> documentBuckets;
-
- StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
- this.name = requireNonNull(name);
- this.configId = requireNonNull(configId);
- this.documentBuckets = Map.copyOf(documentBuckets);
- }
-
- String name() { return name; }
- String configId() { return configId; }
- String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
- Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
-
- }
-
-
- static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
- if (clusters.isEmpty())
- throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
-
- return wanted.map(cluster -> {
- if ( ! clusters.containsKey(cluster))
- throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.get(cluster);
- }).orElseGet(() -> {
- if (clusters.size() > 1)
- throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
- String.join("', '", clusters.keySet()) + "'");
-
- return clusters.values().iterator().next();
- });
- }
-
- private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
- List<String> bucketSpaces, Optional<String> bucketSpace) {
- return documentType.map(type -> cluster.bucketOf(type)
- .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
- "' is not mapped to a known bucket space")))
- .or(() -> bucketSpace.map(space -> {
- if ( ! bucketSpaces.contains(space))
- throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
- String.join(", ", bucketSpaces));
- return space;
- }))
- .orElse(FixedBucketSpaces.defaultSpace());
- }
-
-
-
- private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
- return clusters.storage().stream()
- .collect(toUnmodifiableMap(storage -> storage.name(),
- storage -> new StorageCluster(storage.name(),
- storage.configid(),
- buckets.cluster(storage.name())
- .documentType().entrySet().stream()
- .collect(toMap(entry -> entry.getKey(),
- entry -> entry.getValue().bucketSpace())))));
- }
-
-
- // Visible for testing.
- AsyncSession asyncSession() { return asyncSession; }
- Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
- void notifyMaintainers() throws InterruptedException {
- synchronized (throttled) { throttled.notify(); throttled.wait(); }
- synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
}
}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
new file mode 100644
index 00000000000..53beae2832f
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java
@@ -0,0 +1,515 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
+import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Response;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED;
+import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.SEVERE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
+
+/**
+ * Encapsulates a document access and supports running asynchronous document
+ * operations and visits against this, with retries and optional timeouts.
+ *
+ * @author jonmv
+ */
+public class DocumentOperationExecutorImpl implements DocumentOperationExecutor {
+
+ private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName());
+
+ private final Duration visitTimeout;
+ private final long maxThrottled;
+ private final DocumentAccess access;
+ private final AsyncSession asyncSession;
+ private final Map<String, StorageCluster> clusters;
+ private final Clock clock;
+ private final DelayQueue throttled;
+ private final DelayQueue timeouts;
+ private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>();
+ private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+
+ public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig,
+ DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) {
+ this(Duration.ofMillis(executorConfig.resendDelayMillis()),
+ Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()),
+ Duration.ofSeconds(executorConfig.visitTimeoutSeconds()),
+ executorConfig.maxThrottled(),
+ access,
+ parseClusters(clustersConfig, bucketsConfig),
+ clock);
+ }
+
+ DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled,
+ DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) {
+ this.visitTimeout = requireNonNull(visitTimeout);
+ this.maxThrottled = maxThrottled;
+ this.access = requireNonNull(access);
+ this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle));
+ this.clock = requireNonNull(clock);
+ this.clusters = Map.copyOf(clusters);
+ this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock);
+ this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock);
+ }
+
+ private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
+ if (options.cluster.isEmpty() && options.documentType.isEmpty())
+ throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+
+ VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection,
+ options.documentType,
+ options.namespace.map(value -> "id.namespace=='" + value + "'"),
+ options.group.map(Group::selection))
+ .flatMap(Optional::stream)
+ .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
+ StringJoiner::add,
+ StringJoiner::merge)
+ .toString());
+
+ options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
+ parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10));
+ options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1)));
+ parameters.setTimeoutMs(visitTimeout.toMillis());
+ parameters.visitInconsistentBuckets(true);
+ parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
+
+ StorageCluster storageCluster = resolveCluster(options.cluster, clusters);
+ parameters.setRoute(storageCluster.route());
+ parameters.setBucketSpace(resolveBucket(storageCluster,
+ options.documentType,
+ List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
+ options.bucketSpace));
+
+ return parameters;
+ }
+
+ /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */
+ @Override
+ public void shutdown() {
+ long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli();
+ visits.values().forEach(VisitorSession::destroy);
+ Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30),
+ context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown"));
+ Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40),
+ context -> context.error(TIMEOUT, "Timed out due to shutdown"));
+ try {
+ throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throttleShutdown.cancel(true);
+ throttleShutdown.cancel(true);
+ log.log(WARNING, "Exception shutting down " + getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.get(id, parameters), context);
+ }
+
+ @Override
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.put(put, parameters), context);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.update(update, parameters), context);
+ }
+
+ @Override
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ accept(() -> asyncSession.remove(id, parameters), context);
+ }
+
+ @Override
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ try {
+ AtomicBoolean done = new AtomicBoolean(false);
+ VisitorParameters parameters = asParameters(options, clusters, visitTimeout);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); }
+ @Override public void onRemove(DocumentId id) { } // We don't visit removes here.
+ });
+ parameters.setControlHandler(new VisitorControlHandler() {
+ @Override public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets())
+ context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout);
+ case SUCCESS: // intentional fallthrough
+ case ABORTED:
+ context.success(Optional.ofNullable(getProgress())
+ .filter(progress -> ! progress.isFinished())
+ .map(ProgressToken::serializeToString));
+ break;
+ default:
+ context.error(ERROR, message != null ? message : "Visiting failed");
+ }
+ done.set(true); // This may be reached before dispatching thread is done putting us in the map.
+ visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; });
+ }
+ });
+ visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ if (done.get())
+ visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; });
+ }
+ catch (IllegalArgumentException | ParseException e) {
+ context.error(BAD_REQUEST, Exceptions.toMessageString(e));
+ }
+ catch (RuntimeException e) {
+ context.error(ERROR, Exceptions.toMessageString(e));
+ }
+ }
+
+ @Override
+ public String routeToCluster(String cluster) {
+ return resolveCluster(Optional.of(cluster), clusters).route();
+ }
+
+ /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */
+ private void accept(Supplier<Result> operation, OperationContext context) {
+ timeouts.add(operation, context);
+ send(operation, context);
+ }
+
+ /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */
+ private void send(Supplier<Result> operation, OperationContext context) {
+ Result result = operation.get();
+ switch (result.type()) {
+ case SUCCESS:
+ outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge);
+ break;
+ case TRANSIENT_ERROR:
+ if ( ! throttled.add(operation, context))
+ context.error(OVERLOAD, maxThrottled + " requests already in retry queue");
+ break;
+ default:
+ log.log(WARNING, "Unknown result type '" + result.type() + "'");
+ case FATAL_ERROR: // intentional fallthrough
+ context.error(ERROR, result.getError().getMessage());
+ }
+ }
+
+ private void handle(Response response) {
+ outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge);
+ }
+
+ private static ErrorType toErrorType(Response.Outcome outcome) {
+ switch (outcome) {
+ case NOT_FOUND:
+ return NOT_FOUND;
+ case CONDITION_FAILED:
+ return PRECONDITION_FAILED;
+ default:
+ log.log(WARNING, "Unexpected response outcome: " + outcome);
+ case ERROR: // intentional fallthrough
+ return ERROR;
+ }
+ }
+
+
+ private static class Completion {
+
+ private final OperationContext context;
+ private final Response response;
+
+ private Completion(OperationContext context, Response response) {
+ this.context = context;
+ this.response = response;
+ }
+
+ static Completion of(OperationContext context) {
+ return new Completion(requireNonNull(context), null);
+ }
+
+ static Completion of(Response response) {
+ return new Completion(null, requireNonNull(response));
+ }
+
+ Completion merge(Completion other) {
+ if (context == null)
+ complete(other.context, response);
+ else
+ complete(context, other.response);
+ return null;
+ }
+
+ private void complete(OperationContext context, Response response) {
+ if (response.isSuccess())
+ context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument())
+ : Optional.empty());
+ else
+ context.error(toErrorType(response.outcome()), response.getTextMessage());
+ }
+
+ }
+
+
+ /**
+ * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them.
+ *
+ * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility
+ * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions.
+ */
+ static class DelayQueue {
+
+ private final long maxSize;
+ private final Clock clock;
+ private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>();
+ private final AtomicLong size = new AtomicLong(0);
+ private final Thread maintainer;
+ private final Duration delay;
+ private final long defaultWaitMillis;
+
+ public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) {
+ if (maxSize < 0)
+ throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize);
+ if (delay.isNegative())
+ throw new IllegalArgumentException("Delay cannot be negative, but was " + delay);
+
+ this.maxSize = maxSize;
+ this.delay = delay;
+ this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts.
+ this.clock = requireNonNull(clock);
+ this.maintainer = new Thread(() -> maintain(action));
+ this.maintainer.start();
+ }
+
+ boolean add(Supplier<Result> operation, OperationContext context) {
+ if (size.incrementAndGet() > maxSize) {
+ size.decrementAndGet();
+ return false;
+ }
+ return queue.add(new Delayed(clock.instant().plus(delay), operation, context));
+ }
+
+ long size() { return size.get(); }
+
+ Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) {
+ ExecutorService shutdownService = Executors.newSingleThreadExecutor();
+ Future<?> future = shutdownService.submit(() -> {
+ try {
+ long doomMillis = clock.millis() + grace.toMillis();
+ while (size.get() > 0 && clock.millis() < doomMillis)
+ Thread.sleep(100);
+ }
+ finally {
+ maintainer.interrupt();
+ for (Delayed delayed; (delayed = queue.poll()) != null; ) {
+ size.decrementAndGet();
+ onShutdown.accept(delayed.context());
+ }
+ }
+ return null;
+ });
+ shutdownService.shutdown();
+ return future;
+ }
+
+ /**
+ * Repeatedly loops through the queue, evicting already handled entries and processing those
+ * which have become ready since last time, then waits until new items are guaranteed to be ready,
+ * or until it's time for a new run just to ensure GC of handled entries.
+ * The entries are assumed to always be added to the back of the queue, with the same delay.
+ * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time
+ * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first.
+ */
+ private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) {
+ while ( ! Thread.currentThread().isInterrupted()) {
+ try {
+ Instant waitUntil = null;
+ Iterator<Delayed> operations = queue.iterator();
+ while (operations.hasNext()) {
+ Delayed delayed = operations.next();
+ // Already handled: remove and continue.
+ if (delayed.context().handled()) {
+ operations.remove();
+ size.decrementAndGet();
+ continue;
+ }
+ // Ready for action: remove from queue and run.
+ if (delayed.readyAt().isBefore(clock.instant())) {
+ operations.remove();
+ size.decrementAndGet();
+ action.accept(delayed.operation(), delayed.context());
+ continue;
+ }
+ // Not yet ready for action: keep time to wake up again.
+ waitUntil = waitUntil != null ? waitUntil : delayed.readyAt();
+ }
+ long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis;
+ synchronized (this) {
+ do {
+ notify();
+ wait(Math.max(0, waitUntilMillis - clock.millis()));
+ }
+ while (clock.millis() < waitUntilMillis);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e) {
+ log.log(SEVERE, "Exception caught by delay queue maintainer", e);
+ }
+ }
+ }
+ }
+
+
+ private static class Delayed {
+
+ private final Supplier<Result> operation;
+ private final OperationContext context;
+ private final Instant readyAt;
+
+ Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) {
+ this.readyAt = requireNonNull(readyAt);
+ this.context = requireNonNull(context);
+ this.operation = requireNonNull(operation);
+ }
+
+ Supplier<Result> operation() { return operation; }
+ OperationContext context() { return context; }
+ Instant readyAt() { return readyAt; }
+
+ }
+
+
+ static class StorageCluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<String, String> documentBuckets;
+
+ StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() { return name; }
+ String configId() { return configId; }
+ String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
+ Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
+
+ }
+
+
+ static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ if (clusters.isEmpty())
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
+
+ return wanted.map(cluster -> {
+ if ( ! clusters.containsKey(cluster))
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.get(cluster);
+ }).orElseGet(() -> {
+ if (clusters.size() > 1)
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.values().iterator().next();
+ });
+ }
+
+ private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
+ return documentType.map(type -> cluster.bucketOf(type)
+ .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
+ "' is not mapped to a known bucket space")))
+ .or(() -> bucketSpace.map(space -> {
+ if ( ! bucketSpaces.contains(space))
+ throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
+ String.join(", ", bucketSpaces));
+ return space;
+ }))
+ .orElse(FixedBucketSpaces.defaultSpace());
+ }
+
+
+
+ private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
+ return clusters.storage().stream()
+ .collect(toUnmodifiableMap(storage -> storage.name(),
+ storage -> new StorageCluster(storage.name(),
+ storage.configid(),
+ buckets.cluster(storage.name())
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().bucketSpace())))));
+ }
+
+
+ // Visible for testing.
+ AsyncSession asyncSession() { return asyncSession; }
+ Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); }
+ void notifyMaintainers() throws InterruptedException {
+ synchronized (throttled) { throttled.notify(); throttled.wait(); }
+ synchronized (timeouts) { timeouts.notify(); timeouts.wait(); }
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 96ea6c08f86..2b037b9fcee 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -22,6 +22,7 @@ import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl;
import com.yahoo.documentapi.DocumentOperationParameters;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
@@ -124,7 +125,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(clock,
- new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
+ new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
new DocumentOperationParser(documentManagerConfig),
metric,
metricReceiver);
@@ -552,7 +553,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
- private static class DocumentOperationParser {
+ static class DocumentOperationParser {
private static final JsonFactory jsonFactory = new JsonFactory();
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
new file mode 100644
index 00000000000..95c90e1a4ca
--- /dev/null
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java
@@ -0,0 +1,82 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi;
+
+import com.yahoo.document.DocumentGet;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.documentapi.DocumentOperationParameters;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author jonmv
+ */
+public class DocumentOperationExecutorMock implements DocumentOperationExecutor {
+
+ final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>();
+ final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>();
+ final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>();
+ final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>();
+ final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>();
+
+ @Override
+ public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(new DocumentGet(id), parameters, context);
+ }
+
+ @Override
+ public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(put, parameters, context);
+ }
+
+ @Override
+ public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(update, parameters, context);
+ }
+
+ @Override
+ public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) {
+ setLastOperation(new DocumentRemove(id), parameters, context);
+ }
+
+ @Override
+ public void visit(VisitorOptions options, VisitOperationsContext context) {
+ lastOptions.set(options);
+ lastVisitContext.set(context);
+ }
+
+ @Override
+ public String routeToCluster(String cluster) {
+ return "route-to-" + cluster;
+ }
+
+ public DocumentOperation lastOperation() {
+ return lastOperation.get();
+ }
+
+ public DocumentOperationParameters lastParameters() {
+ return lastParameters.get();
+ }
+
+ public OperationContext lastOperationContext() {
+ return lastOperationContext.get();
+ }
+
+ public VisitorOptions lastOptions() {
+ return lastOptions.get();
+ }
+
+ public VisitOperationsContext lastVisitContext() {
+ return lastVisitContext.get();
+ }
+
+ private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) {
+ lastOperation.set(operation);
+ lastParameters.set(parameters);
+ lastOperationContext.set(context);
+ }
+
+}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
index cb28fdf765d..3d67ff93578 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java
@@ -1,17 +1,18 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.document.restapi;
+import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentType;
-import com.yahoo.document.config.DocumentmanagerConfig;
-import com.yahoo.document.restapi.DocumentOperationExecutor.DelayQueue;
+import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.StorageCluster;
import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster;
+import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue;
import com.yahoo.documentapi.DocumentAccessParams;
import com.yahoo.documentapi.Result;
import com.yahoo.documentapi.VisitorControlHandler;
@@ -19,6 +20,7 @@ import com.yahoo.documentapi.local.LocalAsyncSession;
import com.yahoo.documentapi.local.LocalDocumentAccess;
import com.yahoo.searchdefinition.derived.Deriver;
import com.yahoo.test.ManualClock;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -28,7 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,22 @@ import static org.junit.Assert.fail;
*/
public class DocumentOperationExecutorTest {
+ final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder()
+ .cluster("content",
+ new AllClustersBucketSpacesConfig.Cluster.Builder()
+ .documentType("music",
+ new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder()
+ .bucketSpace(FixedBucketSpaces.defaultSpace())))
+ .build();
+ final ClusterListConfig clusterConfig = new ClusterListConfig.Builder()
+ .storage(new ClusterListConfig.Storage.Builder().configid("config-id")
+ .name("content"))
+ .build();
+ final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder()
+ .resendDelayMillis(10)
+ .defaultTimeoutSeconds(1)
+ .maxThrottled(2)
+ .build();
final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content",
"config-id",
Map.of("music", "route")));
@@ -63,7 +81,7 @@ public class DocumentOperationExecutorTest {
final List<String> tokens = new ArrayList<>();
ManualClock clock;
LocalDocumentAccess access;
- DocumentOperationExecutor executor;
+ DocumentOperationExecutorImpl executor;
DocumentType musicType;
Document doc1;
Document doc2;
@@ -88,13 +106,7 @@ public class DocumentOperationExecutorTest {
public void setUp() {
clock = new ManualClock();
access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build()));
- executor = new DocumentOperationExecutor(Duration.ofMillis(10), // Resend delay
- Duration.ofMillis(60), // Operation timeout
- Duration.ofMillis(100), // Visitor timeout
- 2, // Max documents in retry queue
- access,
- clusters,
- clock);
+ executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock);
received.clear();
errors.clear();
tokens.clear();
@@ -121,17 +133,17 @@ public class DocumentOperationExecutorTest {
catch (IllegalArgumentException e) {
assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage());
}
- assertEquals("content", DocumentOperationExecutor.resolveCluster(Optional.empty(), clusters).name());
+ assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name());
try {
- DocumentOperationExecutor.resolveCluster(Optional.empty(), Map.of());
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of());
fail("No clusters should fail");
}
catch (IllegalArgumentException e) {
assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage());
}
try {
- DocumentOperationExecutor.resolveCluster(Optional.empty(), Map.of("one", new StorageCluster("one", "one-config", Map.of()),
- "two", new StorageCluster("two", "two-config", Map.of())));
+ DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of("one", new StorageCluster("one", "one-config", Map.of()),
+ "two", new StorageCluster("two", "two-config", Map.of())));
fail("More than one cluster and no document type should fail");
}
catch (IllegalArgumentException e) {
@@ -183,7 +195,7 @@ public class DocumentOperationExecutorTest {
access.setPhaser(phaser);
executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts.
- // Put 1 times out after 70 ms, Put 2 succeeds after 70 ms
+ // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms
executor.put(new DocumentPut(doc1), parameters(), operationContext());
clock.advance(Duration.ofMillis(20));
executor.put(new DocumentPut(doc2), parameters(), operationContext());
@@ -191,7 +203,7 @@ public class DocumentOperationExecutorTest {
assertEquals(List.of(), received);
assertEquals(List.of(), errors);
- clock.advance(Duration.ofMillis(60));
+ clock.advance(Duration.ofMillis(990));
executor.notifyMaintainers();
phaser.arrive(); // Let responses flow!
phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered. <3 Phaser <3
@@ -200,7 +212,7 @@ public class DocumentOperationExecutorTest {
session().setResultType(Result.ResultType.TRANSIENT_ERROR);
executor.put(new DocumentPut(doc3), parameters(), operationContext());
- clock.advance(Duration.ofMillis(50));
+ clock.advance(Duration.ofMillis(990));
executor.notifyMaintainers(); // Retry throttled operation.
clock.advance(Duration.ofMillis(20));
executor.notifyMaintainers(); // Time out throttled operation.
@@ -293,14 +305,16 @@ public class DocumentOperationExecutorTest {
.build(),
visitContext());
phaser.arriveAndAwaitAdvance(); // First document pending
+ CountDownLatch latch = new CountDownLatch(1);
Thread shutdownThread = new Thread(() -> {
executor.shutdown();
- phaser.register();
- phaser.awaitAdvance(phaser.arriveAndDeregister());
+ latch.countDown();
});
shutdownThread.start();
- shutdownThread.interrupt(); // Makes sure we don't wait for grace period in shutting down maintainers.
- phaser.awaitAdvance(phaser.arriveAndDeregister());
+ clock.advance(Duration.ofMillis(100));
+ executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly.
+ latch.await(); // Make sure visit session is shut down before next document is considered.
+ phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above.
for (VisitorControlHandler session : executor.visitorSessions()) {
session.waitUntilDone();
}