diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-29 19:16:54 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 10:23:35 +0200 |
commit | 93f607279f17812125f95ee6c3203495c5264106 (patch) | |
tree | 46ab48569a5dc8ff0cc729874c1d60d84c997080 /vespaclient-container-plugin | |
parent | 96c27a8107d572f624018f367a369989efa74f84 (diff) |
Separate out interface for DocumentOperationExecutor
Diffstat (limited to 'vespaclient-container-plugin')
5 files changed, 748 insertions, 534 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java index fb9a6eb5873..8b0c966c46f 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutor.java @@ -1,195 +1,53 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.restapi; -import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentUpdate; import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.fieldset.AllFields; -import com.yahoo.document.select.parser.ParseException; -import com.yahoo.documentapi.AsyncParameters; -import com.yahoo.documentapi.AsyncSession; -import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentOperationParameters; -import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; -import com.yahoo.documentapi.Response; -import com.yahoo.documentapi.Result; -import com.yahoo.documentapi.VisitorControlHandler; import com.yahoo.documentapi.VisitorParameters; -import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.text.Text; -import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; -import com.yahoo.yolean.Exceptions; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.StringJoiner; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; -import java.util.logging.Logger; import java.util.stream.Stream; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; -import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; -import static java.util.Objects.requireNonNull; -import static java.util.logging.Level.SEVERE; -import static java.util.logging.Level.WARNING; -import static java.util.stream.Collectors.toMap; -import static java.util.stream.Collectors.toUnmodifiableMap; - /** - * Encapsulates a document access and supports running asynchronous document - * operations and visits against this, with retries and optional timeouts. + * Wraps the document API with an executor that can retry and time out document operations, + * as well as compute the required visitor parameters for visitor sessions. * * @author jonmv */ -public class DocumentOperationExecutor { - - private static final Logger log = Logger.getLogger(DocumentOperationExecutor.class.getName()); - - private final Duration visitTimeout; - private final long maxThrottled; - private final DocumentAccess access; - private final AsyncSession asyncSession; - private final Map<String, StorageCluster> clusters; - private final Clock clock; - private final DelayQueue throttled; - private final DelayQueue timeouts; - private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>(); - private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); - - public DocumentOperationExecutor(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, - DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) { - this(Duration.ofMillis(executorConfig.resendDelayMillis()), - Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()), - Duration.ofSeconds(executorConfig.visitTimeoutSeconds()), - executorConfig.maxThrottled(), - access, - parseClusters(clustersConfig, bucketsConfig), - clock); - } +public interface DocumentOperationExecutor { - DocumentOperationExecutor(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled, - DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) { - this.visitTimeout = requireNonNull(visitTimeout); - this.maxThrottled = maxThrottled; - this.access = requireNonNull(access); - this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle)); - this.clock = requireNonNull(clock); - this.clusters = Map.copyOf(clusters); - this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock); - this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock); - } + default void shutdown() { } - /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */ - public void shutdown() { - long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli(); - visits.values().forEach(VisitorSession::destroy); - Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30), - context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown")); - Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40), - context -> context.error(TIMEOUT, "Timed out due to shutdown")); - try { - throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); - } - catch (InterruptedException | ExecutionException | TimeoutException e) { - throttleShutdown.cancel(true); - throttleShutdown.cancel(true); - log.log(WARNING, "Exception shutting down " + getClass().getName(), e); - } - } + void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context); - public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.get(id, parameters), context); - } + void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context); - public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.put(put, parameters), context); - } + void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context); - public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.update(update, parameters), context); - } + void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context); - public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { - accept(() -> asyncSession.remove(id, parameters), context); - } - - public void visit(VisitorOptions options, VisitOperationsContext context) { - try { - AtomicBoolean done = new AtomicBoolean(false); - VisitorParameters parameters = options.asParameters(clusters, visitTimeout); - parameters.setLocalDataHandler(new DumpVisitorDataHandler() { - @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); } - @Override public void onRemove(DocumentId id) { } // We don't visit removes here. - }); - parameters.setControlHandler(new VisitorControlHandler() { - @Override public void onDone(CompletionCode code, String message) { - super.onDone(code, message); - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets()) - context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout); - case SUCCESS: // intentional fallthrough - case ABORTED: - context.success(Optional.ofNullable(getProgress()) - .filter(progress -> ! progress.isFinished()) - .map(ProgressToken::serializeToString)); - break; - default: - context.error(ERROR, message != null ? message : "Visiting failed"); - } - done.set(true); // This may be reached before dispatching thread is done putting us in the map. - visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; }); - } - }); - visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); - if (done.get()) - visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; }); - } - catch (IllegalArgumentException | ParseException e) { - context.error(BAD_REQUEST, Exceptions.toMessageString(e)); - } - catch (RuntimeException e) { - context.error(ERROR, Exceptions.toMessageString(e)); - } - } - - public String routeToCluster(String cluster) { - return resolveCluster(Optional.of(cluster), clusters).route(); - } + void visit(VisitorOptions options, VisitOperationsContext context); + String routeToCluster(String cluster); - public enum ErrorType { + enum ErrorType { OVERLOAD, NOT_FOUND, PRECONDITION_FAILED, @@ -199,8 +57,37 @@ public class DocumentOperationExecutor { } + /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */ + class Context<T> { + + private final AtomicBoolean handled = new AtomicBoolean(); + private final BiConsumer<ErrorType, String> onError; + private final Consumer<T> onSuccess; + + Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) { + this.onError = onError; + this.onSuccess = onSuccess; + } + + public void error(ErrorType type, String message) { + if ( ! handled.getAndSet(true)) + onError.accept(type, message); + } + + public void success(T result) { + if ( ! handled.getAndSet(true)) + onSuccess.accept(result); + } + + public boolean handled() { + return handled.get(); + } + + } + + /** Context for reacting to the progress of a visitor session. Completion signalled by an optional progress token. */ - public static class VisitOperationsContext extends Context<Optional<String>> { + class VisitOperationsContext extends Context<Optional<String>> { private final Consumer<Document> onDocument; @@ -209,7 +96,7 @@ public class DocumentOperationExecutor { this.onDocument = onDocument; } - void document(Document document) { + public void document(Document document) { if ( ! handled()) onDocument.accept(document); } @@ -218,7 +105,7 @@ public class DocumentOperationExecutor { /** Context for a document operation. */ - public static class OperationContext extends Context<Optional<Document>> { + class OperationContext extends Context<Optional<Document>> { public OperationContext(BiConsumer<ErrorType, String> onError, Consumer<Optional<Document>> onSuccess) { super(onError, onSuccess); @@ -227,22 +114,22 @@ public class DocumentOperationExecutor { } - public static class VisitorOptions { + class VisitorOptions { - private final Optional<String> cluster; - private final Optional<String> namespace; - private final Optional<String> documentType; - private final Optional<Group> group; - private final Optional<String> selection; - private final Optional<String> fieldSet; - private final Optional<String> continuation; - private final Optional<String> bucketSpace; - private final Optional<Integer> wantedDocumentCount; - private final Optional<Integer> concurrency; + final Optional<String> cluster; + final Optional<String> namespace; + final Optional<String> documentType; + final Optional<Group> group; + final Optional<String> selection; + final Optional<String> fieldSet; + final Optional<String> continuation; + final Optional<String> bucketSpace; + final Optional<Integer> wantedDocumentCount; + final Optional<Integer> concurrency; private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace, Optional<Group> group, Optional<String> selection, Optional<String> fieldSet, - Optional<String> continuation,Optional<String> bucketSpace, + Optional<String> continuation, Optional<String> bucketSpace, Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) { this.cluster = cluster; this.namespace = namespace; @@ -256,38 +143,42 @@ public class DocumentOperationExecutor { this.concurrency = concurrency; } - private VisitorParameters asParameters(Map<String, StorageCluster> clusters, Duration visitTimeout) { - if (cluster.isEmpty() && documentType.isEmpty()) - throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); - - VisitorParameters parameters = new VisitorParameters(Stream.of(selection, - documentType, - namespace.map(value -> "id.namespace=='" + value + "'"), - group.map(Group::selection)) - .flatMap(Optional::stream) - .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right - StringJoiner::add, - StringJoiner::merge) - .toString()); - - continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); - parameters.setFieldSet(fieldSet.orElse(documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); - wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); - parameters.setMaxTotalHits(wantedDocumentCount.orElse(1 << 10)); - concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); - parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(concurrency.orElse(1))); - parameters.setTimeoutMs(visitTimeout.toMillis()); - parameters.visitInconsistentBuckets(true); - parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); - - StorageCluster storageCluster = resolveCluster(cluster, clusters); - parameters.setRoute(storageCluster.route()); - parameters.setBucketSpace(resolveBucket(storageCluster, - documentType, - List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), - bucketSpace)); - - return parameters; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VisitorOptions that = (VisitorOptions) o; + return cluster.equals(that.cluster) && + namespace.equals(that.namespace) && + documentType.equals(that.documentType) && + group.equals(that.group) && + selection.equals(that.selection) && + fieldSet.equals(that.fieldSet) && + continuation.equals(that.continuation) && + bucketSpace.equals(that.bucketSpace) && + wantedDocumentCount.equals(that.wantedDocumentCount) && + concurrency.equals(that.concurrency); + } + + @Override + public int hashCode() { + return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency); + } + + @Override + public String toString() { + return "VisitorOptions{" + + "cluster=" + cluster + + ", namespace=" + namespace + + ", documentType=" + documentType + + ", group=" + group + + ", selection=" + selection + + ", fieldSet=" + fieldSet + + ", continuation=" + continuation + + ", bucketSpace=" + bucketSpace + + ", wantedDocumentCount=" + wantedDocumentCount + + ", concurrency=" + concurrency + + '}'; } public static Builder builder() { return new Builder(); } @@ -369,7 +260,7 @@ public class DocumentOperationExecutor { } - public static class Group { + class Group { private final String value; private final String docIdPart; @@ -390,319 +281,30 @@ public class DocumentOperationExecutor { public String docIdPart() { return docIdPart; } public String selection() { return selection; } - } - - - /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ - private void accept(Supplier<Result> operation, OperationContext context) { - timeouts.add(operation, context); - send(operation, context); - } - - /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */ - private void send(Supplier<Result> operation, OperationContext context) { - Result result = operation.get(); - switch (result.type()) { - case SUCCESS: - outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge); - break; - case TRANSIENT_ERROR: - if ( ! throttled.add(operation, context)) - context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); - break; - default: - log.log(WARNING, "Unknown result type '" + result.type() + "'"); - case FATAL_ERROR: // intentional fallthrough - context.error(ERROR, result.getError().getMessage()); - } - } - - private void handle(Response response) { - outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge); - } - - private static ErrorType toErrorType(Response.Outcome outcome) { - switch (outcome) { - case NOT_FOUND: - return NOT_FOUND; - case CONDITION_FAILED: - return PRECONDITION_FAILED; - default: - log.log(WARNING, "Unexpected response outcome: " + outcome); - case ERROR: // intentional fallthrough - return ERROR; - } - } - - - /** The executor will call <em>exactly one</em> callback <em>exactly once</em> for contexts submitted to it. */ - private static class Context<T> { - - private final AtomicBoolean handled = new AtomicBoolean(); - private final BiConsumer<ErrorType, String> onError; - private final Consumer<T> onSuccess; - - Context(BiConsumer<ErrorType, String> onError, Consumer<T> onSuccess) { - this.onError = onError; - this.onSuccess = onSuccess; - } - - void error(ErrorType type, String message) { - if ( ! handled.getAndSet(true)) - onError.accept(type, message); - } - - void success(T result) { - if ( ! handled.getAndSet(true)) - onSuccess.accept(result); - } - - boolean handled() { - return handled.get(); - } - - } - - - private static class Completion { - - private final OperationContext context; - private final Response response; - - private Completion(OperationContext context, Response response) { - this.context = context; - this.response = response; - } - - static Completion of(OperationContext context) { - return new Completion(requireNonNull(context), null); - } - - static Completion of(Response response) { - return new Completion(null, requireNonNull(response)); - } - - Completion merge(Completion other) { - if (context == null) - complete(other.context, response); - else - complete(context, other.response); - return null; + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Group group = (Group) o; + return value.equals(group.value) && + docIdPart.equals(group.docIdPart) && + selection.equals(group.selection); } - private void complete(OperationContext context, Response response) { - if (response.isSuccess()) - context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) - : Optional.empty()); - else - context.error(toErrorType(response.outcome()), response.getTextMessage()); - } - - } - - - /** - * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. - * - * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility - * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions. - */ - static class DelayQueue { - - private final long maxSize; - private final Clock clock; - private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>(); - private final AtomicLong size = new AtomicLong(0); - private final Thread maintainer; - private final Duration delay; - private final long defaultWaitMillis; - - public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) { - if (maxSize < 0) - throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize); - if (delay.isNegative()) - throw new IllegalArgumentException("Delay cannot be negative, but was " + delay); - - this.maxSize = maxSize; - this.delay = delay; - this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts. - this.clock = requireNonNull(clock); - this.maintainer = new Thread(() -> maintain(action)); - this.maintainer.start(); - } - - boolean add(Supplier<Result> operation, OperationContext context) { - if (size.incrementAndGet() > maxSize) { - size.decrementAndGet(); - return false; - } - return queue.add(new Delayed(clock.instant().plus(delay), operation, context)); + @Override + public int hashCode() { + return Objects.hash(value, docIdPart, selection); } - long size() { return size.get(); } - - Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) { - ExecutorService shutdownService = Executors.newSingleThreadExecutor(); - Future<?> future = shutdownService.submit(() -> { - try { - Thread.sleep(grace.toMillis()); - } - finally { - maintainer.interrupt(); - for (Delayed delayed; (delayed = queue.poll()) != null; ) { - size.decrementAndGet(); - onShutdown.accept(delayed.context()); - } - } - return null; - }); - shutdownService.shutdown(); - return future; + @Override + public String toString() { + return "Group{" + + "value='" + value + '\'' + + ", docIdPart='" + docIdPart + '\'' + + ", selection='" + selection + '\'' + + '}'; } - /** - * Repeatedly loops through the queue, evicting already handled entries and processing those - * which have become ready since last time, then waits until new items are guaranteed to be ready, - * or until it's time for a new run just to ensure GC of handled entries. - * The entries are assumed to always be added to the back of the queue, with the same delay. - * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time - * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first. - */ - private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) { - while ( ! Thread.currentThread().isInterrupted()) { - try { - Instant waitUntil = null; - Iterator<Delayed> operations = queue.iterator(); - while (operations.hasNext()) { - Delayed delayed = operations.next(); - // Already handled: remove and continue. - if (delayed.context().handled()) { - operations.remove(); - size.decrementAndGet(); - continue; - } - // Ready for action: remove from queue and run. - if (delayed.readyAt().isBefore(clock.instant())) { - operations.remove(); - size.decrementAndGet(); - action.accept(delayed.operation(), delayed.context()); - continue; - } - // Not yet ready for action: keep time to wake up again. - waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); - } - long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis; - synchronized (this) { - do { - notify(); - wait(Math.max(0, waitUntilMillis - clock.millis())); - } - while (clock.millis() < waitUntilMillis); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - catch (Exception e) { - log.log(SEVERE, "Exception caught by delay queue maintainer", e); - } - } - } - } - - - private static class Delayed { - - private final Supplier<Result> operation; - private final OperationContext context; - private final Instant readyAt; - - Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) { - this.readyAt = requireNonNull(readyAt); - this.context = requireNonNull(context); - this.operation = requireNonNull(operation); - } - - Supplier<Result> operation() { return operation; } - OperationContext context() { return context; } - Instant readyAt() { return readyAt; } - - } - - - static class StorageCluster { - - private final String name; - private final String configId; - private final Map<String, String> documentBuckets; - - StorageCluster(String name, String configId, Map<String, String> documentBuckets) { - this.name = requireNonNull(name); - this.configId = requireNonNull(configId); - this.documentBuckets = Map.copyOf(documentBuckets); - } - - String name() { return name; } - String configId() { return configId; } - String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } - Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } - - } - - - static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { - if (clusters.isEmpty()) - throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); - - return wanted.map(cluster -> { - if ( ! clusters.containsKey(cluster)) - throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + - String.join("', '", clusters.keySet()) + "'"); - - return clusters.get(cluster); - }).orElseGet(() -> { - if (clusters.size() > 1) - throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + - String.join("', '", clusters.keySet()) + "'"); - - return clusters.values().iterator().next(); - }); - } - - private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, - List<String> bucketSpaces, Optional<String> bucketSpace) { - return documentType.map(type -> cluster.bucketOf(type) - .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + - "' is not mapped to a known bucket space"))) - .or(() -> bucketSpace.map(space -> { - if ( ! bucketSpaces.contains(space)) - throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + - String.join(", ", bucketSpaces)); - return space; - })) - .orElse(FixedBucketSpaces.defaultSpace()); - } - - - - private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) { - return clusters.storage().stream() - .collect(toUnmodifiableMap(storage -> storage.name(), - storage -> new StorageCluster(storage.name(), - storage.configid(), - buckets.cluster(storage.name()) - .documentType().entrySet().stream() - .collect(toMap(entry -> entry.getKey(), - entry -> entry.getValue().bucketSpace()))))); - } - - - // Visible for testing. - AsyncSession asyncSession() { return asyncSession; } - Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); } - void notifyMaintainers() throws InterruptedException { - synchronized (throttled) { throttled.notify(); throttled.wait(); } - synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } } } diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java new file mode 100644 index 00000000000..53beae2832f --- /dev/null +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/DocumentOperationExecutorImpl.java @@ -0,0 +1,515 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.FixedBucketSpaces; +import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; +import com.yahoo.documentapi.DocumentAccess; +import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.DocumentResponse; +import com.yahoo.documentapi.DumpVisitorDataHandler; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; +import com.yahoo.yolean.Exceptions; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.logging.Logger; +import java.util.stream.Stream; + +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.BAD_REQUEST; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.ERROR; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.NOT_FOUND; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.OVERLOAD; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.PRECONDITION_FAILED; +import static com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType.TIMEOUT; +import static java.util.Objects.requireNonNull; +import static java.util.logging.Level.SEVERE; +import static java.util.logging.Level.WARNING; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableMap; + +/** + * Encapsulates a document access and supports running asynchronous document + * operations and visits against this, with retries and optional timeouts. + * + * @author jonmv + */ +public class DocumentOperationExecutorImpl implements DocumentOperationExecutor { + + private static final Logger log = Logger.getLogger(DocumentOperationExecutorImpl.class.getName()); + + private final Duration visitTimeout; + private final long maxThrottled; + private final DocumentAccess access; + private final AsyncSession asyncSession; + private final Map<String, StorageCluster> clusters; + private final Clock clock; + private final DelayQueue throttled; + private final DelayQueue timeouts; + private final Map<Long, Completion> outstanding = new ConcurrentHashMap<>(); + private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); + + public DocumentOperationExecutorImpl(ClusterListConfig clustersConfig, AllClustersBucketSpacesConfig bucketsConfig, + DocumentOperationExecutorConfig executorConfig, DocumentAccess access, Clock clock) { + this(Duration.ofMillis(executorConfig.resendDelayMillis()), + Duration.ofSeconds(executorConfig.defaultTimeoutSeconds()), + Duration.ofSeconds(executorConfig.visitTimeoutSeconds()), + executorConfig.maxThrottled(), + access, + parseClusters(clustersConfig, bucketsConfig), + clock); + } + + DocumentOperationExecutorImpl(Duration resendDelay, Duration defaultTimeout, Duration visitTimeout, long maxThrottled, + DocumentAccess access, Map<String, StorageCluster> clusters, Clock clock) { + this.visitTimeout = requireNonNull(visitTimeout); + this.maxThrottled = maxThrottled; + this.access = requireNonNull(access); + this.asyncSession = access.createAsyncSession(new AsyncParameters().setResponseHandler(this::handle)); + this.clock = requireNonNull(clock); + this.clusters = Map.copyOf(clusters); + this.throttled = new DelayQueue(maxThrottled, this::send, resendDelay, clock); + this.timeouts = new DelayQueue(Long.MAX_VALUE, (__, context) -> context.error(TIMEOUT, "Timed out after " + defaultTimeout), defaultTimeout, clock); + } + + private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { + if (options.cluster.isEmpty() && options.documentType.isEmpty()) + throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + + VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, + options.documentType, + options.namespace.map(value -> "id.namespace=='" + value + "'"), + options.group.map(Group::selection)) + .flatMap(Optional::stream) + .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right + StringJoiner::add, + StringJoiner::merge) + .toString()); + + options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); + parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); + options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); + parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10)); + options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1))); + parameters.setTimeoutMs(visitTimeout.toMillis()); + parameters.visitInconsistentBuckets(true); + parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); + + StorageCluster storageCluster = resolveCluster(options.cluster, clusters); + parameters.setRoute(storageCluster.route()); + parameters.setBucketSpace(resolveBucket(storageCluster, + options.documentType, + List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), + options.bucketSpace)); + + return parameters; + } + + /** Assumes this stops receiving operations roughly when this is called, then waits up to 50 seconds to drain operations. */ + @Override + public void shutdown() { + long shutdownMillis = clock.instant().plusSeconds(50).toEpochMilli(); + visits.values().forEach(VisitorSession::destroy); + Future<?> throttleShutdown = throttled.shutdown(Duration.ofSeconds(30), + context -> context.error(OVERLOAD, "Retry on overload failed due to shutdown")); + Future<?> timeoutShutdown = timeouts.shutdown(Duration.ofSeconds(40), + context -> context.error(TIMEOUT, "Timed out due to shutdown")); + try { + throttleShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); + timeoutShutdown.get(Math.max(0, shutdownMillis - clock.millis()), TimeUnit.MILLISECONDS); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throttleShutdown.cancel(true); + throttleShutdown.cancel(true); + log.log(WARNING, "Exception shutting down " + getClass().getName(), e); + } + } + + @Override + public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.get(id, parameters), context); + } + + @Override + public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.put(put, parameters), context); + } + + @Override + public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.update(update, parameters), context); + } + + @Override + public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + accept(() -> asyncSession.remove(id, parameters), context); + } + + @Override + public void visit(VisitorOptions options, VisitOperationsContext context) { + try { + AtomicBoolean done = new AtomicBoolean(false); + VisitorParameters parameters = asParameters(options, clusters, visitTimeout); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { context.document(doc); } + @Override public void onRemove(DocumentId id) { } // We don't visit removes here. + }); + parameters.setControlHandler(new VisitorControlHandler() { + @Override public void onDone(CompletionCode code, String message) { + super.onDone(code, message); + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets()) + context.error(TIMEOUT, "No buckets visited within timeout of " + visitTimeout); + case SUCCESS: // intentional fallthrough + case ABORTED: + context.success(Optional.ofNullable(getProgress()) + .filter(progress -> ! progress.isFinished()) + .map(ProgressToken::serializeToString)); + break; + default: + context.error(ERROR, message != null ? message : "Visiting failed"); + } + done.set(true); // This may be reached before dispatching thread is done putting us in the map. + visits.computeIfPresent(this, (__, session) -> { session.destroy(); return null; }); + } + }); + visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + if (done.get()) + visits.computeIfPresent(parameters.getControlHandler(), (__, session) -> { session.destroy(); return null; }); + } + catch (IllegalArgumentException | ParseException e) { + context.error(BAD_REQUEST, Exceptions.toMessageString(e)); + } + catch (RuntimeException e) { + context.error(ERROR, Exceptions.toMessageString(e)); + } + } + + @Override + public String routeToCluster(String cluster) { + return resolveCluster(Optional.of(cluster), clusters).route(); + } + + /** Rejects operation if retry queue is full; otherwise starts a timer for the given task, and attempts to send it. */ + private void accept(Supplier<Result> operation, OperationContext context) { + timeouts.add(operation, context); + send(operation, context); + } + + /** Sends the given operation through the async session of this, enqueueing a retry if throttled, unless overloaded. */ + private void send(Supplier<Result> operation, OperationContext context) { + Result result = operation.get(); + switch (result.type()) { + case SUCCESS: + outstanding.merge(result.getRequestId(), Completion.of(context), Completion::merge); + break; + case TRANSIENT_ERROR: + if ( ! throttled.add(operation, context)) + context.error(OVERLOAD, maxThrottled + " requests already in retry queue"); + break; + default: + log.log(WARNING, "Unknown result type '" + result.type() + "'"); + case FATAL_ERROR: // intentional fallthrough + context.error(ERROR, result.getError().getMessage()); + } + } + + private void handle(Response response) { + outstanding.merge(response.getRequestId(), Completion.of(response), Completion::merge); + } + + private static ErrorType toErrorType(Response.Outcome outcome) { + switch (outcome) { + case NOT_FOUND: + return NOT_FOUND; + case CONDITION_FAILED: + return PRECONDITION_FAILED; + default: + log.log(WARNING, "Unexpected response outcome: " + outcome); + case ERROR: // intentional fallthrough + return ERROR; + } + } + + + private static class Completion { + + private final OperationContext context; + private final Response response; + + private Completion(OperationContext context, Response response) { + this.context = context; + this.response = response; + } + + static Completion of(OperationContext context) { + return new Completion(requireNonNull(context), null); + } + + static Completion of(Response response) { + return new Completion(null, requireNonNull(response)); + } + + Completion merge(Completion other) { + if (context == null) + complete(other.context, response); + else + complete(context, other.response); + return null; + } + + private void complete(OperationContext context, Response response) { + if (response.isSuccess()) + context.success(response instanceof DocumentResponse ? Optional.ofNullable(((DocumentResponse) response).getDocument()) + : Optional.empty()); + else + context.error(toErrorType(response.outcome()), response.getTextMessage()); + } + + } + + + /** + * Keeps delayed operations (retries or timeouts) until ready, at which point a bulk maintenance operation processes them. + * + * This is similar to {@link java.util.concurrent.DelayQueue}, but sacrifices the flexibility + * of using dynamic timeouts, and latency, for higher throughput and efficient (lazy) deletions. + */ + static class DelayQueue { + + private final long maxSize; + private final Clock clock; + private final ConcurrentLinkedQueue<Delayed> queue = new ConcurrentLinkedQueue<>(); + private final AtomicLong size = new AtomicLong(0); + private final Thread maintainer; + private final Duration delay; + private final long defaultWaitMillis; + + public DelayQueue(long maxSize, BiConsumer<Supplier<Result>, OperationContext> action, Duration delay, Clock clock) { + if (maxSize < 0) + throw new IllegalArgumentException("Max size cannot be negative, but was " + maxSize); + if (delay.isNegative()) + throw new IllegalArgumentException("Delay cannot be negative, but was " + delay); + + this.maxSize = maxSize; + this.delay = delay; + this.defaultWaitMillis = Math.min(delay.toMillis(), 100); // Run regularly to evict handled contexts. + this.clock = requireNonNull(clock); + this.maintainer = new Thread(() -> maintain(action)); + this.maintainer.start(); + } + + boolean add(Supplier<Result> operation, OperationContext context) { + if (size.incrementAndGet() > maxSize) { + size.decrementAndGet(); + return false; + } + return queue.add(new Delayed(clock.instant().plus(delay), operation, context)); + } + + long size() { return size.get(); } + + Future<?> shutdown(Duration grace, Consumer<OperationContext> onShutdown) { + ExecutorService shutdownService = Executors.newSingleThreadExecutor(); + Future<?> future = shutdownService.submit(() -> { + try { + long doomMillis = clock.millis() + grace.toMillis(); + while (size.get() > 0 && clock.millis() < doomMillis) + Thread.sleep(100); + } + finally { + maintainer.interrupt(); + for (Delayed delayed; (delayed = queue.poll()) != null; ) { + size.decrementAndGet(); + onShutdown.accept(delayed.context()); + } + } + return null; + }); + shutdownService.shutdown(); + return future; + } + + /** + * Repeatedly loops through the queue, evicting already handled entries and processing those + * which have become ready since last time, then waits until new items are guaranteed to be ready, + * or until it's time for a new run just to ensure GC of handled entries. + * The entries are assumed to always be added to the back of the queue, with the same delay. + * If the queue is to support random delays, the maintainer must be woken up on every insert with a ready time + * lower than the current, and the earliest sleepUntilMillis be computed, rather than simply the first. + */ + private void maintain(BiConsumer<Supplier<Result>, OperationContext> action) { + while ( ! Thread.currentThread().isInterrupted()) { + try { + Instant waitUntil = null; + Iterator<Delayed> operations = queue.iterator(); + while (operations.hasNext()) { + Delayed delayed = operations.next(); + // Already handled: remove and continue. + if (delayed.context().handled()) { + operations.remove(); + size.decrementAndGet(); + continue; + } + // Ready for action: remove from queue and run. + if (delayed.readyAt().isBefore(clock.instant())) { + operations.remove(); + size.decrementAndGet(); + action.accept(delayed.operation(), delayed.context()); + continue; + } + // Not yet ready for action: keep time to wake up again. + waitUntil = waitUntil != null ? waitUntil : delayed.readyAt(); + } + long waitUntilMillis = waitUntil != null ? waitUntil.toEpochMilli() : clock.millis() + defaultWaitMillis; + synchronized (this) { + do { + notify(); + wait(Math.max(0, waitUntilMillis - clock.millis())); + } + while (clock.millis() < waitUntilMillis); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (Exception e) { + log.log(SEVERE, "Exception caught by delay queue maintainer", e); + } + } + } + } + + + private static class Delayed { + + private final Supplier<Result> operation; + private final OperationContext context; + private final Instant readyAt; + + Delayed(Instant readyAt, Supplier<Result> operation, OperationContext context) { + this.readyAt = requireNonNull(readyAt); + this.context = requireNonNull(context); + this.operation = requireNonNull(operation); + } + + Supplier<Result> operation() { return operation; } + OperationContext context() { return context; } + Instant readyAt() { return readyAt; } + + } + + + static class StorageCluster { + + private final String name; + private final String configId; + private final Map<String, String> documentBuckets; + + StorageCluster(String name, String configId, Map<String, String> documentBuckets) { + this.name = requireNonNull(name); + this.configId = requireNonNull(configId); + this.documentBuckets = Map.copyOf(documentBuckets); + } + + String name() { return name; } + String configId() { return configId; } + String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } + Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } + + } + + + static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { + if (clusters.isEmpty()) + throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); + + return wanted.map(cluster -> { + if ( ! clusters.containsKey(cluster)) + throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.get(cluster); + }).orElseGet(() -> { + if (clusters.size() > 1) + throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.values().iterator().next(); + }); + } + + private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, + List<String> bucketSpaces, Optional<String> bucketSpace) { + return documentType.map(type -> cluster.bucketOf(type) + .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + + "' is not mapped to a known bucket space"))) + .or(() -> bucketSpace.map(space -> { + if ( ! bucketSpaces.contains(space)) + throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + + String.join(", ", bucketSpaces)); + return space; + })) + .orElse(FixedBucketSpaces.defaultSpace()); + } + + + + private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) { + return clusters.storage().stream() + .collect(toUnmodifiableMap(storage -> storage.name(), + storage -> new StorageCluster(storage.name(), + storage.configid(), + buckets.cluster(storage.name()) + .documentType().entrySet().stream() + .collect(toMap(entry -> entry.getKey(), + entry -> entry.getValue().bucketSpace()))))); + } + + + // Visible for testing. + AsyncSession asyncSession() { return asyncSession; } + Collection<VisitorControlHandler> visitorSessions() { return visits.keySet(); } + void notifyMaintainers() throws InterruptedException { + synchronized (throttled) { throttled.notify(); throttled.wait(); } + synchronized (timeouts) { timeouts.notify(); timeouts.wait(); } + } + +} diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 96ea6c08f86..2b037b9fcee 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -22,6 +22,7 @@ import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; @@ -124,7 +125,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(clock, - new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock), + new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock), new DocumentOperationParser(documentManagerConfig), metric, metricReceiver); @@ -552,7 +553,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } - private static class DocumentOperationParser { + static class DocumentOperationParser { private static final JsonFactory jsonFactory = new JsonFactory(); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java new file mode 100644 index 00000000000..95c90e1a4ca --- /dev/null +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorMock.java @@ -0,0 +1,82 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.document.restapi; + +import com.yahoo.document.DocumentGet; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.documentapi.DocumentOperationParameters; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * @author jonmv + */ +public class DocumentOperationExecutorMock implements DocumentOperationExecutor { + + final AtomicReference<DocumentOperation> lastOperation = new AtomicReference<>(); + final AtomicReference<DocumentOperationParameters> lastParameters = new AtomicReference<>(); + final AtomicReference<OperationContext> lastOperationContext = new AtomicReference<>(); + final AtomicReference<VisitorOptions> lastOptions = new AtomicReference<>(); + final AtomicReference<VisitOperationsContext> lastVisitContext = new AtomicReference<>(); + + @Override + public void get(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(new DocumentGet(id), parameters, context); + } + + @Override + public void put(DocumentPut put, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(put, parameters, context); + } + + @Override + public void update(DocumentUpdate update, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(update, parameters, context); + } + + @Override + public void remove(DocumentId id, DocumentOperationParameters parameters, OperationContext context) { + setLastOperation(new DocumentRemove(id), parameters, context); + } + + @Override + public void visit(VisitorOptions options, VisitOperationsContext context) { + lastOptions.set(options); + lastVisitContext.set(context); + } + + @Override + public String routeToCluster(String cluster) { + return "route-to-" + cluster; + } + + public DocumentOperation lastOperation() { + return lastOperation.get(); + } + + public DocumentOperationParameters lastParameters() { + return lastParameters.get(); + } + + public OperationContext lastOperationContext() { + return lastOperationContext.get(); + } + + public VisitorOptions lastOptions() { + return lastOptions.get(); + } + + public VisitOperationsContext lastVisitContext() { + return lastVisitContext.get(); + } + + private void setLastOperation(DocumentOperation operation, DocumentOperationParameters parameters, OperationContext context) { + lastOperation.set(operation); + lastParameters.set(parameters); + lastOperationContext.set(context); + } + +} diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java index cb28fdf765d..3d67ff93578 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/DocumentOperationExecutorTest.java @@ -1,17 +1,18 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.document.restapi; +import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.document.Document; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentType; -import com.yahoo.document.config.DocumentmanagerConfig; -import com.yahoo.document.restapi.DocumentOperationExecutor.DelayQueue; +import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType; import com.yahoo.document.restapi.DocumentOperationExecutor.Group; import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; -import com.yahoo.document.restapi.DocumentOperationExecutor.StorageCluster; import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl.StorageCluster; +import com.yahoo.document.restapi.DocumentOperationExecutorImpl.DelayQueue; import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.VisitorControlHandler; @@ -19,6 +20,7 @@ import com.yahoo.documentapi.local.LocalAsyncSession; import com.yahoo.documentapi.local.LocalDocumentAccess; import com.yahoo.searchdefinition.derived.Deriver; import com.yahoo.test.ManualClock; +import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -28,7 +30,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; @@ -54,6 +56,22 @@ import static org.junit.Assert.fail; */ public class DocumentOperationExecutorTest { + final AllClustersBucketSpacesConfig bucketConfig = new AllClustersBucketSpacesConfig.Builder() + .cluster("content", + new AllClustersBucketSpacesConfig.Cluster.Builder() + .documentType("music", + new AllClustersBucketSpacesConfig.Cluster.DocumentType.Builder() + .bucketSpace(FixedBucketSpaces.defaultSpace()))) + .build(); + final ClusterListConfig clusterConfig = new ClusterListConfig.Builder() + .storage(new ClusterListConfig.Storage.Builder().configid("config-id") + .name("content")) + .build(); + final DocumentOperationExecutorConfig executorConfig = new DocumentOperationExecutorConfig.Builder() + .resendDelayMillis(10) + .defaultTimeoutSeconds(1) + .maxThrottled(2) + .build(); final Map<String, StorageCluster> clusters = Map.of("content", new StorageCluster("content", "config-id", Map.of("music", "route"))); @@ -63,7 +81,7 @@ public class DocumentOperationExecutorTest { final List<String> tokens = new ArrayList<>(); ManualClock clock; LocalDocumentAccess access; - DocumentOperationExecutor executor; + DocumentOperationExecutorImpl executor; DocumentType musicType; Document doc1; Document doc2; @@ -88,13 +106,7 @@ public class DocumentOperationExecutorTest { public void setUp() { clock = new ManualClock(); access = new LocalDocumentAccess(new DocumentAccessParams().setDocumentmanagerConfig(Deriver.getDocumentManagerConfig("src/test/cfg/music.sd").build())); - executor = new DocumentOperationExecutor(Duration.ofMillis(10), // Resend delay - Duration.ofMillis(60), // Operation timeout - Duration.ofMillis(100), // Visitor timeout - 2, // Max documents in retry queue - access, - clusters, - clock); + executor = new DocumentOperationExecutorImpl(clusterConfig, bucketConfig, executorConfig, access, clock); received.clear(); errors.clear(); tokens.clear(); @@ -121,17 +133,17 @@ public class DocumentOperationExecutorTest { catch (IllegalArgumentException e) { assertEquals("Your Vespa deployment has no content cluster 'blargh', only 'content'", e.getMessage()); } - assertEquals("content", DocumentOperationExecutor.resolveCluster(Optional.empty(), clusters).name()); + assertEquals("content", DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), clusters).name()); try { - DocumentOperationExecutor.resolveCluster(Optional.empty(), Map.of()); + DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of()); fail("No clusters should fail"); } catch (IllegalArgumentException e) { assertEquals("Your Vespa deployment has no content clusters, so the document API is not enabled", e.getMessage()); } try { - DocumentOperationExecutor.resolveCluster(Optional.empty(), Map.of("one", new StorageCluster("one", "one-config", Map.of()), - "two", new StorageCluster("two", "two-config", Map.of()))); + DocumentOperationExecutorImpl.resolveCluster(Optional.empty(), Map.of("one", new StorageCluster("one", "one-config", Map.of()), + "two", new StorageCluster("two", "two-config", Map.of()))); fail("More than one cluster and no document type should fail"); } catch (IllegalArgumentException e) { @@ -183,7 +195,7 @@ public class DocumentOperationExecutorTest { access.setPhaser(phaser); executor.notifyMaintainers(); // Make sure maintainers have gone to sleep before tests starts. - // Put 1 times out after 70 ms, Put 2 succeeds after 70 ms + // Put 1 times out after 1010 ms, Put 2 succeeds after 1010 ms executor.put(new DocumentPut(doc1), parameters(), operationContext()); clock.advance(Duration.ofMillis(20)); executor.put(new DocumentPut(doc2), parameters(), operationContext()); @@ -191,7 +203,7 @@ public class DocumentOperationExecutorTest { assertEquals(List.of(), received); assertEquals(List.of(), errors); - clock.advance(Duration.ofMillis(60)); + clock.advance(Duration.ofMillis(990)); executor.notifyMaintainers(); phaser.arrive(); // Let responses flow! phaser.arriveAndAwaitAdvance(); // Wait for responses to be delivered. <3 Phaser <3 @@ -200,7 +212,7 @@ public class DocumentOperationExecutorTest { session().setResultType(Result.ResultType.TRANSIENT_ERROR); executor.put(new DocumentPut(doc3), parameters(), operationContext()); - clock.advance(Duration.ofMillis(50)); + clock.advance(Duration.ofMillis(990)); executor.notifyMaintainers(); // Retry throttled operation. clock.advance(Duration.ofMillis(20)); executor.notifyMaintainers(); // Time out throttled operation. @@ -293,14 +305,16 @@ public class DocumentOperationExecutorTest { .build(), visitContext()); phaser.arriveAndAwaitAdvance(); // First document pending + CountDownLatch latch = new CountDownLatch(1); Thread shutdownThread = new Thread(() -> { executor.shutdown(); - phaser.register(); - phaser.awaitAdvance(phaser.arriveAndDeregister()); + latch.countDown(); }); shutdownThread.start(); - shutdownThread.interrupt(); // Makes sure we don't wait for grace period in shutting down maintainers. - phaser.awaitAdvance(phaser.arriveAndDeregister()); + clock.advance(Duration.ofMillis(100)); + executor.notifyMaintainers(); // Purge timeout operations so maintainers can shut down quickly. + latch.await(); // Make sure visit session is shut down before next document is considered. + phaser.awaitAdvance(phaser.arriveAndDeregister()); // See above. for (VisitorControlHandler session : executor.visitorSessions()) { session.waitUntilDone(); } |