aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-13 13:51:14 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-10-13 13:51:14 +0200
commit89098c7afddc62256d3b969c1fdc452e95360038 (patch)
tree92f71a78e6cd3dab9f058246d2739b2b8c92ea5b /vespaclient-container-plugin
parentc458fcd2c2a35b129763115439ba3d13502b40e0 (diff)
Take 2 of async /document/v1 handler — handler only
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java1153
1 files changed, 846 insertions, 307 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 2754d7ee627..59cf6db43ef 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -2,64 +2,90 @@
package com.yahoo.document.restapi.resource;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
import com.google.inject.Inject;
import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.container.jdisc.ContentChannelOutputStream;
+import com.yahoo.document.Document;
import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.fieldset.AllFields;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
import com.yahoo.document.json.JsonWriter;
-import com.yahoo.document.restapi.DocumentOperationExecutor;
-import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
-import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
-import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
-import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
-import com.yahoo.document.restapi.DocumentOperationExecutorImpl;
+import com.yahoo.document.select.parser.ParseException;
+import com.yahoo.documentapi.AsyncParameters;
+import com.yahoo.documentapi.AsyncSession;
+import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.DocumentResponse;
+import com.yahoo.documentapi.DumpVisitorDataHandler;
+import com.yahoo.documentapi.ProgressToken;
+import com.yahoo.documentapi.Result;
+import com.yahoo.documentapi.VisitorControlHandler;
+import com.yahoo.documentapi.VisitorParameters;
+import com.yahoo.documentapi.VisitorSession;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.metrics.DocumentApiMetrics;
import com.yahoo.documentapi.metrics.DocumentOperationStatus;
import com.yahoo.jdisc.Metric;
import com.yahoo.jdisc.Request;
import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.BufferedContentChannel;
import com.yahoo.jdisc.handler.CompletionHandler;
import com.yahoo.jdisc.handler.ContentChannel;
import com.yahoo.jdisc.handler.ReadableContentChannel;
import com.yahoo.jdisc.handler.ResponseHandler;
import com.yahoo.jdisc.handler.UnsafeContentInputStream;
-import com.yahoo.container.core.HandlerMetricContextUtil;
import com.yahoo.jdisc.http.HttpRequest;
import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.metrics.simple.MetricReceiver;
import com.yahoo.restapi.Path;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Inspector;
-import com.yahoo.slime.Slime;
-import com.yahoo.slime.SlimeUtils;
+import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Clock;
+import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
+import java.util.StringJoiner;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.logging.Logger;
+import java.util.stream.Stream;
import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
@@ -71,9 +97,11 @@ import static java.util.Objects.requireNonNull;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toMap;
+import static java.util.stream.Collectors.toUnmodifiableMap;
/**
- * Asynchronous HTTP handler for /document/v1/
+ * Asynchronous HTTP handler for /document/v1
*
* @author jonmv
*/
@@ -86,7 +114,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final CompletionHandler logException = new CompletionHandler() {
@Override public void completed() { }
@Override public void failed(Throwable t) {
- log.log(FINE, () -> "Exception writing or closing response data: " + Exceptions.toMessageString(t));
+ log.log(FINE, "Exception writing or closing response data", t);
}
};
@@ -95,23 +123,35 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override public void close(CompletionHandler handler) { handler.completed(); }
};
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private static final Duration requestTimeout = Duration.ofSeconds(175);
+ private static final Duration visitTimeout = Duration.ofSeconds(120);
+
private static final String CREATE = "create";
private static final String CONDITION = "condition";
- private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
+ private static final String ROUTE = "route";
private static final String FIELD_SET = "fieldSet";
private static final String SELECTION = "selection";
- private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
+ private static final String CLUSTER = "cluster";
private static final String CONTINUATION = "continuation";
private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
private static final String CONCURRENCY = "concurrency";
private static final String BUCKET_SPACE = "bucketSpace";
private final Clock clock;
- private final Metric metric; // TODO jonmv: make response class which logs on completion/error
+ private final Metric metric;
private final DocumentApiMetrics metrics;
- private final DocumentOperationExecutor executor;
private final DocumentOperationParser parser;
- private final Map<String, Map<Method, Handler>> handlers;
+ private final long maxThrottled;
+ private final DocumentAccess access;
+ private final AsyncSession asyncSession;
+ private final Map<String, StorageCluster> clusters;
+ private final Deque<Operation> operations;
+ private final AtomicLong enqueued = new AtomicLong();
+ private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
+ private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
public DocumentV1ApiHandler(Metric metric,
@@ -122,24 +162,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
AllClustersBucketSpacesConfig bucketSpacesConfig,
DocumentOperationExecutorConfig executorConfig) {
this(Clock.systemUTC(),
- new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, Clock.systemUTC()),
new DocumentOperationParser(documentManagerConfig),
metric,
- metricReceiver);
+ metricReceiver,
+ executorConfig.maxThrottled(),
+ documentAccess,
+ parseClusters(clusterListConfig, bucketSpacesConfig));
}
- DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
- Metric metric, MetricReceiver metricReceiver) {
+ DocumentV1ApiHandler(Clock clock, DocumentOperationParser parser, Metric metric, MetricReceiver metricReceiver,
+ int maxThrottled, DocumentAccess access, Map<String, StorageCluster> clusters) {
this.clock = clock;
- this.executor = executor;
this.parser = parser;
this.metric = metric;
this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
- this.handlers = defineApi();
+ this.maxThrottled = maxThrottled;
+ this.access = access;
+ this.asyncSession = access.createAsyncSession(new AsyncParameters());
+ this.clusters = clusters;
+ this.operations = new ConcurrentLinkedDeque<>();
+ this.executor.scheduleWithFixedDelay(this::dispatchEnqueued, 10, 10, TimeUnit.MILLISECONDS); // TODO jonmv: make testable.
}
+ // ------------------------------------------------ Requests -------------------------------------------------
+
@Override
public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
+ rawRequest.setTimeout(requestTimeout.toMillis(), TimeUnit.MILLISECONDS);
+
HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
ResponseHandler responseHandler = response -> {
HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
@@ -147,34 +197,52 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
};
HttpRequest request = (HttpRequest) rawRequest;
- try {
- Path requestPath = new Path(request.getUri());
- for (String path : handlers.keySet())
- if (requestPath.matches(path)) {
- Map<Method, Handler> methods = handlers.get(path);
- if (methods.containsKey(request.getMethod()))
- return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
-
- if (request.getMethod() == OPTIONS)
- return options(methods.keySet(), responseHandler);
+ try {
+ Path requestPath = new Path(request.getUri());
+ for (String path : handlers.keySet())
+ if (requestPath.matches(path)) {
+ Map<Method, Handler> methods = handlers.get(path);
+ if (methods.containsKey(request.getMethod()))
+ return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
+
+ if (request.getMethod() == OPTIONS)
+ options(methods.keySet(), responseHandler);
+
+ methodNotAllowed(request, methods.keySet(), responseHandler);
+ }
+ notFound(request, handlers.keySet(), responseHandler);
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, responseHandler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, responseHandler);
+ }
+ return ignoredContent;
+ }
- return methodNotAllowed(request, methods.keySet(), responseHandler);
- }
- return notFound(request, handlers.keySet(), responseHandler);
- }
- catch (IllegalArgumentException e) {
- return badRequest(request, e, responseHandler);
- }
- catch (RuntimeException e) {
- return serverError(request, e, responseHandler);
- }
+ @Override
+ public void handleTimeout(Request request, ResponseHandler responseHandler) {
+ timeout((HttpRequest) request, "Request timeout after " + requestTimeout, responseHandler);
}
@Override
public void destroy() {
- this.executor.shutdown();
+ executor.shutdown();
+ visits.values().forEach(VisitorSession::destroy);
+ try {
+ if ( ! executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ if ( ! executor.awaitTermination(10, TimeUnit.SECONDS))
+ log.log(WARNING, "Failed shutting down /document/v1 executor within 20 seconds");
+ }
+ }
+ catch (InterruptedException e) {
+ log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
+ }
}
+ /** Defines all paths/methods handled by this handler. */
private Map<String, Map<Method, Handler>> defineApi() {
Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
@@ -212,343 +280,354 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
+ enqueue(request, handler, () -> {
+ VisitorOptions options = parseOptions(request, path).build();
+ return () -> {
+ visit(request, options, handler);
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
return ignoredContent;
}
private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- VisitorOptions.Builder options = parseOptions(request, path);
- options = options.documentType(path.documentType());
- options = options.namespace(path.namespace());
- options = path.group().map(options::group).orElse(options);
- executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
+ enqueue(request, handler, () -> {
+ VisitorOptions.Builder optionsBuilder = parseOptions(request, path);
+ optionsBuilder = optionsBuilder.documentType(path.documentType());
+ optionsBuilder = optionsBuilder.namespace(path.namespace());
+ optionsBuilder = path.group().map(optionsBuilder::group).orElse(optionsBuilder);
+ VisitorOptions options = optionsBuilder.build();
+ return () -> {
+ visit(request, options, handler);
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
return ignoredContent;
}
- private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
- Object monitor = new Object();
- return new VisitOperationsContext((type, message) -> {
- synchronized (monitor) {
- handleError(request, type, message, root, handler);
- }
- },
- token -> {
- token.ifPresent(value -> root.setString("continuation", value));
- synchronized (monitor) {
- respond(root, handler);
- }
- },
- // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
- document -> {
- try {
- synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
- SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
- documents.addObject());
- }
- }
- // TODO jonmv: This shouldn't happen much, but ... expose errors too?
- catch (RuntimeException e) {
- log.log(WARNING, "Exception serializing document in document/v1 visit response", e);
- }
- });
- }
-
private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
- DocumentId id = path.id();
- DocumentOperationParameters parameters = parameters();
- parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
- parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
- executor.get(id,
- parameters,
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- document -> {
- try {
- Cursor root = responseRoot(request, id);
- document.map(JsonWriter::toByteArray)
- .map(SlimeUtils::jsonToSlime)
- .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields")));
- respond(document.isPresent() ? 200 : 404,
- root,
- handler);
- }
- catch (Exception e) {
- serverError(request, new RuntimeException(e), handler);
- }
- }));
+ enqueue(request, handler, () -> {
+ DocumentOperationParameters rawParameters = parameters();
+ rawParameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route())
+ .map(rawParameters::withRoute)
+ .orElse(rawParameters);
+ rawParameters = getProperty(request, FIELD_SET).map(rawParameters::withFieldSet)
+ .orElse(rawParameters);
+ DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> {
+ handle(path, handler, response, (document, jsonResponse) -> {
+ if (document != null) {
+ jsonResponse.writeSingleDocument(document);
+ jsonResponse.commit(Response.Status.OK);
+ }
+ else
+ jsonResponse.commit(Response.Status.NOT_FOUND);
+ });
+ });
+ return () -> dispatchOperation(request, handler, () -> asyncSession.get(path.id(), parameters));
+ });
return ignoredContent;
}
private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant());
return new ForwardingContentChannel(in -> {
- try {
- DocumentPut put = parser.parsePut(in, id.toString());
+ enqueue(request, handler, () -> {
+ DocumentPut put = parser.parsePut(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
- executor.put(put,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, e, handler);
- }
- catch (RuntimeException e) {
- serverError(request, e, handler);
- }
+ DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
+ .orElse(parameters());
+ DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
+ return () -> dispatchOperation(request, handler, () -> asyncSession.put(put, parameters));
+ });
});
}
private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
- ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.UPDATE, clock.instant());
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant());
return new ForwardingContentChannel(in -> {
- try {
- DocumentUpdate update = parser.parseUpdate(in, id.toString());
+ enqueue(request, handler, () -> {
+ DocumentUpdate update = parser.parseUpdate(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
- executor.update(update,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
- }
- catch (IllegalArgumentException e) {
- badRequest(request, e, handler);
- }
- catch (RuntimeException e) {
- serverError(request, e, handler);
- }
+ DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
+ .orElse(parameters());
+ DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
+ return () -> dispatchOperation(request, handler, () -> asyncSession.update(update, parameters));
+ });
});
}
private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
- DocumentId id = path.id();
ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant());
- executor.remove(id,
- getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
- new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
- __ -> respond(responseRoot(request, id), handler)));
+ enqueue(request, handler, () -> {
+ DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
+ .orElse(parameters());
+ DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
+ return () -> dispatchOperation(request, handler, () -> asyncSession.remove(path.id(), parameters));
+ });
return ignoredContent;
}
- private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
- switch (type) {
- case BAD_REQUEST:
- badRequest(request, message, root, handler);
- break;
- case NOT_FOUND:
- notFound(request, message, root, handler);
- break;
- case PRECONDITION_FAILED:
- preconditionFailed(request, message, root, handler);
- break;
- case OVERLOAD:
- overload(request, message, root, handler);
- break;
- case TIMEOUT:
- timeout(request, message, root, handler);
- break;
- case INSUFFICIENT_STORAGE:
- insufficientStorage(request, message, root, handler);
- break;
- default:
- log.log(WARNING, "Unexpected error type '" + type + "'");
- case ERROR: // intentional fallthrough
- serverError(request, message, root, handler);
+ /** Dispatches enqueued requests until one is blocked. */
+ private void dispatchEnqueued() {
+ try {
+ while (dispatch());
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
}
}
- // ------------------------------------------------ Responses ------------------------------------------------
-
- private static Cursor responseRoot(HttpRequest request) {
- Cursor root = new Slime().setObject();
- root.setString("pathId", request.getUri().getRawPath());
- return root;
- }
-
- private static Cursor responseRoot(HttpRequest request, DocumentId id) {
- Cursor root = responseRoot(request);
- root.setString("id", id.toString());
- return root;
- }
-
- private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
- Response response = new Response(Response.Status.NO_CONTENT);
- response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
- handler.handleResponse(response).close(logException);
- return ignoredContent;
- }
-
- private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
- return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
- }
-
- private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.BAD_REQUEST, root, handler);
- }
+ /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */
+ private boolean dispatch() {
+ Operation operation = operations.poll();
+ if (operation == null)
+ return false;
- private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
- return notFound(request,
- "Nothing at '" + request.getUri().getRawPath() + "'. " +
- "Available paths are:\n" + String.join("\n", paths),
- responseRoot(request),
- handler);
+ if (operation.dispatch()) {
+ enqueued.decrementAndGet();
+ return true;
+ }
+ operations.push(operation);
+ return false;
}
- private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.NOT_FOUND, root, handler);
+ /** Enqueues the given request and operation, or responds with "overload" if the queue is full. */
+ private void enqueue(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) {
+ if (enqueued.incrementAndGet() > maxThrottled) {
+ enqueued.decrementAndGet();
+ overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler);
+ return;
+ }
+ operations.offer(Operation.lazilyParsed(request, handler, operationParser));
}
- private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
- Cursor root = responseRoot(request);
- root.setString("message",
- "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
- "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
- return respond(Response.Status.METHOD_NOT_ALLOWED,
- root,
- handler);
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
}
- private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- root.setString("message", message);
- return respond(Response.Status.PRECONDITION_FAILED, root, handler);
- }
+ // ------------------------------------------------ Responses ------------------------------------------------
- private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
- }
+ /** Class for writing and returning JSON responses to document operations in a thread safe manner. */
+ private static class JsonResponse implements AutoCloseable {
- private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
- Cursor root = responseRoot(request);
- root.setString("message", Exceptions.toMessageString(e));
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
+ private final BufferedContentChannel buffer = new BufferedContentChannel();
+ private final OutputStream out = new ContentChannelOutputStream(buffer);
+ private final JsonGenerator json = jsonFactory.createGenerator(out);
+ private final ResponseHandler handler;
+ private ContentChannel channel;
- private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
- }
+ private JsonResponse(ResponseHandler handler) throws IOException {
+ this.handler = handler;
+ json.writeStartObject();
+ }
- private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
- }
+ /** Creates a new JsonResponse with path and id fields written. */
+ static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException {
+ JsonResponse response = new JsonResponse(handler);
+ response.writePathId(path.rawPath());
+ response.writeDocId(path.id());
+ return response;
+ }
- private static ContentChannel insufficientStorage(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
- log.log(FINE, () -> "Insufficient storage for " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
- root.setString("message", message);
- return respond(Response.Status.INSUFFICIENT_STORAGE, root, handler);
- }
+ /** Creates a new JsonResponse with path field written. */
+ static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException {
+ JsonResponse response = new JsonResponse(handler);
+ response.writePathId(request.getUri().getRawPath());
+ return response;
+ }
- private static ContentChannel respond(Inspector root, ResponseHandler handler) {
- return respond(200, root, handler);
- }
+ /** Creates a new JsonResponse with path and message fields written. */
+ static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException {
+ JsonResponse response = new JsonResponse(handler);
+ response.writePathId(request.getUri().getRawPath());
+ response.writeMessage(message);
+ return response;
+ }
- private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
- Response response = new Response(status);
- response.headers().put("Content-Type", "application/json; charset=UTF-8");
- ContentChannel out = null;
- try {
- out = handler.handleResponse(response);
- out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
+ /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */
+ synchronized void commit(int status) throws IOException {
+ Response response = new Response(status);
+ response.headers().addAll(Map.of("Content-Type", List.of("application/json; charset=UTF-8")));
+ try {
+ channel = handler.handleResponse(response);
+ buffer.connectTo(channel);
+ }
+ catch (RuntimeException e) {
+ throw new IOException(e);
+ }
}
- catch (Exception e) {
- log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
+
+ /** Commits a response with the given status code and some default headers, writes buffered content, and closes this. */
+ synchronized void respond(int status) throws IOException {
+ try (this) {
+ commit(status);
+ }
}
- finally {
- if (out != null) try {
- out.close(logException);
+
+ /** Closes the JSON and the output content channel of this. */
+ @Override
+ public synchronized void close() throws IOException {
+ try {
+ if (channel == null) {
+ log.log(WARNING, "Close called before response was committed, in " + getClass().getName());
+ commit(Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ json.close(); // Also closes object and array scopes.
+ out.close(); // Simply flushes the output stream.
}
- catch (Exception e) {
- log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
+ finally {
+ if (channel != null)
+ channel.close(logException); // Closes the response handler's content channel.
}
}
- return ignoredContent;
- }
- // ------------------------------------------------ Helpers ------------------------------------------------
+ synchronized void writePathId(String path) throws IOException {
+ json.writeStringField("pathId", path);
+ }
- private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
- VisitorOptions.Builder options = VisitorOptions.builder();
+ synchronized void writeMessage(String message) throws IOException {
+ json.writeStringField("message", message);
+ }
- getProperty(request, SELECTION).ifPresent(options::selection);
- getProperty(request, CONTINUATION).ifPresent(options::continuation);
- getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
- getProperty(request, CLUSTER).ifPresent(options::cluster);
- getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
- getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
- .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
- getProperty(request, CONCURRENCY, numberParser)
- .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+ synchronized void writeDocId(DocumentId id) throws IOException {
+ json.writeStringField("id", id.toString());
+ }
- return options;
- }
+ synchronized void writeSingleDocument(Document document) throws IOException {
+ new JsonWriter(json).writeFields(document);
+ }
- static class DocumentPath {
+ synchronized void writeDocumentsArrayStart() throws IOException {
+ json.writeArrayFieldStart("documents");
+ }
- private final Path path;
- private final Optional<Group> group;
+ synchronized void writeDocumentValue(Document document) throws IOException {
+ new JsonWriter(json).write(document);
+ }
- DocumentPath(Path path) {
- this.path = requireNonNull(path);
- this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
- .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ synchronized void writeArrayEnd() throws IOException {
+ json.writeEndArray();
}
- DocumentId id() {
- return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
- ":" + requireNonNull(path.get("documentType")) +
- ":" + group.map(Group::docIdPart).orElse("") +
- ":" + requireNonNull(path.get("docid")));
+ synchronized void writeContinuation(String token) throws IOException {
+ json.writeStringField("continuation", token);
}
- String documentType() { return requireNonNull(path.get("documentType")); }
- String namespace() { return requireNonNull(path.get("namespace")); }
- Optional<Group> group() { return group; }
+ }
+ private static void options(Collection<Method> methods, ResponseHandler handler) {
+ loggingException(() -> {
+ Response response = new Response(Response.Status.NO_CONTENT);
+ response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
+ handler.handleResponse(response).close(logException);
+ });
}
- private static Optional<String> getProperty(HttpRequest request, String name) {
- List<String> values = request.parameters().get(name);
- if (values != null && values.size() != 0)
- return Optional.ofNullable(values.get(values.size() - 1));
+ private static void badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
+ loggingException(() -> {
+ String message = Exceptions.toMessageString(e);
+ log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
+ JsonResponse.create(request, message, handler).respond(Response.Status.BAD_REQUEST);
+ });
+ }
- return Optional.empty();
+ private static void notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
+ loggingException(() -> {
+ JsonResponse.create(request,
+ "Nothing at '" + request.getUri().getRawPath() + "'. " +
+ "Available paths are:\n" + String.join("\n", paths),
+ handler)
+ .respond(Response.Status.NOT_FOUND);
+ });
}
- private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
- return getProperty(request, name).map(parser::parse);
+ private static void methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
+ loggingException(() -> {
+ JsonResponse.create(request,
+ "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
+ "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")),
+ handler)
+ .respond(Response.Status.METHOD_NOT_ALLOWED);
+ });
}
+ private static void overload(HttpRequest request, String message, ResponseHandler handler) {
+ loggingException(() -> {
+ log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ JsonResponse.create(request, message, handler).respond(Response.Status.TOO_MANY_REQUESTS);
+ });
+ }
- @FunctionalInterface
- interface Parser<T> extends Function<String, T> {
- default T parse(String value) {
- try {
- return apply(value);
- }
- catch (RuntimeException e) {
- throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
- }
+ private static void serverError(HttpRequest request, Throwable t, ResponseHandler handler) {
+ loggingException(() -> {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", t);
+ JsonResponse.create(request, Exceptions.toMessageString(t), handler).respond(Response.Status.INTERNAL_SERVER_ERROR);
+ });
+ }
+
+ private static void timeout(HttpRequest request, String message, ResponseHandler handler) {
+ loggingException(() -> {
+ log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ JsonResponse.create(request, message, handler).respond(Response.Status.GATEWAY_TIMEOUT);
+ });
+ }
+
+ private static void loggingException(Exceptions.RunnableThrowingIOException runnable) {
+ try {
+ runnable.run();
+ }
+ catch (Exception e) {
+ log.log(FINE, "Failed writing response", e);
}
}
+ // ---------------------------------------------Document Operations ----------------------------------------
@FunctionalInterface
- interface Handler {
- ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ interface Operation {
+
+ /**
+ * Attempts to dispatch this operation to the document API, and returns whether this completed or not.
+ * This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if
+ * dispatch should be retried at a later time.
+ */
+ boolean dispatch();
+
+ /** Wraps the operation parser in an Operation that is parsed the first time it is attempted dispatched. */
+ static Operation lazilyParsed(HttpRequest request, ResponseHandler handler, Supplier<Operation> parser) {
+ AtomicReference<Operation> operation = new AtomicReference<>();
+ return () -> {
+ try {
+ operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch();
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, handler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, handler);
+ }
+ return true;
+ };
+ }
+
}
+ /** Attempts to send the given document operation, returning false if thes needs to be retried. */
+ private static boolean dispatchOperation(HttpRequest request, ResponseHandler handler, Supplier<Result> documentOperation) {
+ if (request.isCancelled())
+ return true;
+
+ Result result = documentOperation.get();
+ if (result.type() == Result.ResultType.TRANSIENT_ERROR)
+ return false;
+
+ if (result.type() == Result.ResultType.FATAL_ERROR)
+ serverError(request, result.getError(), handler);
+
+ return true;
+ }
/** Readable content channel which forwards data to a reader when closed. */
static class ForwardingContentChannel implements ContentChannel {
@@ -589,11 +668,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
-
static class DocumentOperationParser {
- private static final JsonFactory jsonFactory = new JsonFactory();
-
private final DocumentTypeManager manager;
DocumentOperationParser(DocumentmanagerConfig config) {
@@ -614,6 +690,334 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
+ interface SuccessCallback {
+ void onSuccess(Document document, JsonResponse response) throws IOException;
+ }
+
+ private static void handle(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response, SuccessCallback callback) {
+ try (JsonResponse jsonResponse = JsonResponse.create(path, handler)) {
+ if (response.isSuccess())
+ callback.onSuccess((response instanceof DocumentResponse) ? ((DocumentResponse) response).getDocument() : null, jsonResponse);
+ else {
+ jsonResponse.writeMessage(response.getTextMessage());
+ switch (response.outcome()) {
+ case NOT_FOUND:
+ jsonResponse.commit(Response.Status.NOT_FOUND);
+ break;
+ case CONDITION_FAILED:
+ jsonResponse.commit(Response.Status.PRECONDITION_FAILED);
+ break;
+ case INSUFFICIENT_STORAGE:
+ log.log(WARNING, "Insufficient storage left in cluster: " + response.getTextMessage());
+ jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE);
+ break;
+ default:
+ log.log(WARNING, "Unexpected document API operation outcome '" + response.outcome() + "'");
+ case ERROR:
+ log.log(WARNING, "Exception performing document operation: " + response.getTextMessage());
+ jsonResponse.commit(Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+ }
+ catch (Exception e) {
+ log.log(FINE, "Failed writing response", e);
+ }
+ }
+
+ private static void handle(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response) {
+ handle(path, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK));
+ }
+
+ // ------------------------------------------------- Visits ------------------------------------------------
+
+ private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
+ VisitorOptions.Builder options = VisitorOptions.builder();
+
+ getProperty(request, SELECTION).ifPresent(options::selection);
+ getProperty(request, CONTINUATION).ifPresent(options::continuation);
+ getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
+ getProperty(request, CLUSTER).ifPresent(options::cluster);
+ getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
+ getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
+ .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
+ getProperty(request, CONCURRENCY, numberParser)
+ .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+
+ return options;
+ }
+
+ private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) {
+ if (options.cluster.isEmpty() && options.documentType.isEmpty())
+ throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level");
+
+ VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection,
+ options.documentType,
+ options.namespace.map(value -> "id.namespace=='" + value + "'"),
+ options.group.map(Group::selection))
+ .flatMap(Optional::stream)
+ .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right
+ StringJoiner::add,
+ StringJoiner::merge)
+ .toString());
+
+ options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken);
+ parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME)));
+ options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); });
+ parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10));
+ options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); });
+ parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1)));
+ parameters.setTimeoutMs(visitTimeout.toMillis());
+ parameters.visitInconsistentBuckets(true);
+ parameters.setPriority(DocumentProtocol.Priority.NORMAL_4);
+
+ StorageCluster storageCluster = resolveCluster(options.cluster, clusters);
+ parameters.setRoute(storageCluster.route());
+ parameters.setBucketSpace(resolveBucket(storageCluster,
+ options.documentType,
+ List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
+ options.bucketSpace));
+
+ return parameters;
+ }
+
+ private void visit(HttpRequest request, VisitorOptions options, ResponseHandler handler) {
+ try {
+ JsonResponse response = JsonResponse.create(request, handler);
+ response.writeDocumentsArrayStart();
+ CountDownLatch latch = new CountDownLatch(1);
+ VisitorParameters parameters = asParameters(options, clusters, visitTimeout);
+ parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
+ @Override public void onDocument(Document doc, long timeStamp) {
+ loggingException(() -> {
+ response.writeDocumentValue(doc);
+ });
+ }
+ @Override public void onRemove(DocumentId id) { } // We don't visit removes.
+ });
+ parameters.setControlHandler(new VisitorControlHandler() {
+ @Override public void onDone(CompletionCode code, String message) {
+ super.onDone(code, message);
+ loggingException(() -> {
+ response.writeArrayEnd(); // Close "documents" array.
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " + visitTimeout);
+ response.commit(Response.Status.GATEWAY_TIMEOUT);
+ break;
+ }
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (getProgress() != null && ! getProgress().isFinished())
+ response.writeContinuation(getProgress().serializeToString());
+
+ response.commit(Response.Status.OK);
+ break;
+ default:
+ response.writeMessage(message != null ? message : "Visiting failed");
+ response.commit(Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ executor.execute(() -> {
+ try {
+ latch.await(); // We may get here while dispatching thread is still putting us in the map.
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ visits.get(this).destroy();
+ });
+ });
+ }
+ });
+ visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
+ latch.countDown();
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, handler);
+ }
+ catch (ParseException e) {
+ badRequest(request, new IllegalArgumentException(e), handler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, handler);
+ }
+ catch (Exception e) {
+ log.log(FINE, "Failed writing response", e);
+ }
+ }
+
+ // TODO jonmv: Inline this class.
+ static class VisitorOptions {
+
+ final Optional<String> cluster;
+ final Optional<String> namespace;
+ final Optional<String> documentType;
+ final Optional<Group> group;
+ final Optional<String> selection;
+ final Optional<String> fieldSet;
+ final Optional<String> continuation;
+ final Optional<String> bucketSpace;
+ final Optional<Integer> wantedDocumentCount;
+ final Optional<Integer> concurrency;
+
+ private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace,
+ Optional<Group> group, Optional<String> selection, Optional<String> fieldSet,
+ Optional<String> continuation, Optional<String> bucketSpace,
+ Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) {
+ this.cluster = cluster;
+ this.namespace = namespace;
+ this.documentType = documentType;
+ this.group = group;
+ this.selection = selection;
+ this.fieldSet = fieldSet;
+ this.continuation = continuation;
+ this.bucketSpace = bucketSpace;
+ this.wantedDocumentCount = wantedDocumentCount;
+ this.concurrency = concurrency;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VisitorOptions that = (VisitorOptions) o;
+ return cluster.equals(that.cluster) &&
+ namespace.equals(that.namespace) &&
+ documentType.equals(that.documentType) &&
+ group.equals(that.group) &&
+ selection.equals(that.selection) &&
+ fieldSet.equals(that.fieldSet) &&
+ continuation.equals(that.continuation) &&
+ bucketSpace.equals(that.bucketSpace) &&
+ wantedDocumentCount.equals(that.wantedDocumentCount) &&
+ concurrency.equals(that.concurrency);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency);
+ }
+
+ @Override
+ public String toString() {
+ return "VisitorOptions{" +
+ "cluster=" + cluster +
+ ", namespace=" + namespace +
+ ", documentType=" + documentType +
+ ", group=" + group +
+ ", selection=" + selection +
+ ", fieldSet=" + fieldSet +
+ ", continuation=" + continuation +
+ ", bucketSpace=" + bucketSpace +
+ ", wantedDocumentCount=" + wantedDocumentCount +
+ ", concurrency=" + concurrency +
+ '}';
+ }
+
+ public static Builder builder() { return new Builder(); }
+
+
+ public static class Builder {
+
+ private String cluster;
+ private String documentType;
+ private String namespace;
+ private Group group;
+ private String selection;
+ private String fieldSet;
+ private String continuation;
+ private String bucketSpace;
+ private Integer wantedDocumentCount;
+ private Integer concurrency;
+
+ public Builder cluster(String cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+
+ public Builder documentType(String documentType) {
+ this.documentType = documentType;
+ return this;
+ }
+
+ public Builder namespace(String namespace) {
+ this.namespace = namespace;
+ return this;
+ }
+
+ public Builder group(Group group) {
+ this.group = group;
+ return this;
+ }
+
+ public Builder selection(String selection) {
+ this.selection = selection;
+ return this;
+ }
+
+ public Builder fieldSet(String fieldSet) {
+ this.fieldSet = fieldSet;
+ return this;
+ }
+
+ public Builder continuation(String continuation) {
+ this.continuation = continuation;
+ return this;
+ }
+
+ public Builder bucketSpace(String bucketSpace) {
+ this.bucketSpace = bucketSpace;
+ return this;
+ }
+
+ public Builder wantedDocumentCount(Integer wantedDocumentCount) {
+ this.wantedDocumentCount = wantedDocumentCount;
+ return this;
+ }
+
+ public Builder concurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ return this;
+ }
+
+ public VisitorOptions build() {
+ return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType),
+ Optional.ofNullable(namespace), Optional.ofNullable(group),
+ Optional.ofNullable(selection), Optional.ofNullable(fieldSet),
+ Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace),
+ Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency));
+ }
+
+ }
+
+ }
+
+ // ------------------------------------------------ Helpers ------------------------------------------------
+
+ private static Optional<String> getProperty(HttpRequest request, String name) {
+ List<String> values = request.parameters().get(name);
+ if (values != null && values.size() != 0)
+ return Optional.ofNullable(values.get(values.size() - 1));
+
+ return Optional.empty();
+ }
+
+ private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
+ return getProperty(request, name).map(parser::parse);
+ }
+
+ @FunctionalInterface
+ interface Parser<T> extends Function<String, T> {
+ default T parse(String value) {
+ try {
+ return apply(value);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
private class MeasuringResponseHandler implements ResponseHandler {
private final ResponseHandler delegate;
@@ -638,4 +1042,139 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
+ static class StorageCluster {
+
+ private final String name;
+ private final String configId;
+ private final Map<String, String> documentBuckets;
+
+ StorageCluster(String name, String configId, Map<String, String> documentBuckets) {
+ this.name = requireNonNull(name);
+ this.configId = requireNonNull(configId);
+ this.documentBuckets = Map.copyOf(documentBuckets);
+ }
+
+ String name() { return name; }
+ String configId() { return configId; }
+ String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; }
+ Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); }
+
+ }
+
+ private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) {
+ return clusters.storage().stream()
+ .collect(toUnmodifiableMap(storage -> storage.name(),
+ storage -> new StorageCluster(storage.name(),
+ storage.configid(),
+ buckets.cluster(storage.name())
+ .documentType().entrySet().stream()
+ .collect(toMap(entry -> entry.getKey(),
+ entry -> entry.getValue().bucketSpace())))));
+ }
+
+ static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) {
+ if (clusters.isEmpty())
+ throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled");
+
+ return wanted.map(cluster -> {
+ if ( ! clusters.containsKey(cluster))
+ throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.get(cluster);
+ }).orElseGet(() -> {
+ if (clusters.size() > 1)
+ throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" +
+ String.join("', '", clusters.keySet()) + "'");
+
+ return clusters.values().iterator().next();
+ });
+ }
+
+ private static String resolveBucket(StorageCluster cluster, Optional<String> documentType,
+ List<String> bucketSpaces, Optional<String> bucketSpace) {
+ return documentType.map(type -> cluster.bucketOf(type)
+ .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() +
+ "' is not mapped to a known bucket space")))
+ .or(() -> bucketSpace.map(space -> {
+ if ( ! bucketSpaces.contains(space))
+ throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " +
+ String.join(", ", bucketSpaces));
+ return space;
+ }))
+ .orElse(FixedBucketSpaces.defaultSpace());
+ }
+
+ private static class DocumentPath {
+
+ private final Path path;
+ private final Optional<Group> group;
+
+ DocumentPath(Path path) {
+ this.path = requireNonNull(path);
+ this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
+ .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ }
+
+ DocumentId id() {
+ return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
+ ":" + requireNonNull(path.get("documentType")) +
+ ":" + group.map(Group::docIdPart).orElse("") +
+ ":" + requireNonNull(path.get("docid")));
+ }
+
+ String rawPath() { return path.asString(); }
+ String documentType() { return requireNonNull(path.get("documentType")); }
+ String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<Group> group() { return group; }
+
+ }
+
+ static class Group {
+
+ private final String value;
+ private final String docIdPart;
+ private final String selection;
+
+ private Group(String value, String docIdPart, String selection) {
+ Text.validateTextString(value)
+ .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); });
+ this.value = value;
+ this.docIdPart = docIdPart;
+ this.selection = selection;
+ }
+
+ public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); }
+ public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); }
+
+ public String value() { return value; }
+ public String docIdPart() { return docIdPart; }
+ public String selection() { return selection; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Group group = (Group) o;
+ return value.equals(group.value) &&
+ docIdPart.equals(group.docIdPart) &&
+ selection.equals(group.selection);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value, docIdPart, selection);
+ }
+
+ @Override
+ public String toString() {
+ return "Group{" +
+ "value='" + value + '\'' +
+ ", docIdPart='" + docIdPart + '\'' +
+ ", selection='" + selection + '\'' +
+ '}';
+ }
+
+ }
+
}