diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 13:51:14 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-13 13:51:14 +0200 |
commit | 89098c7afddc62256d3b969c1fdc452e95360038 (patch) | |
tree | 92f71a78e6cd3dab9f058246d2739b2b8c92ea5b /vespaclient-container-plugin | |
parent | c458fcd2c2a35b129763115439ba3d13502b40e0 (diff) |
Take 2 of async /document/v1 handler — handler only
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 1153 |
1 files changed, 846 insertions, 307 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 2754d7ee627..59cf6db43ef 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -2,64 +2,90 @@ package com.yahoo.document.restapi.resource; import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.google.inject.Inject; import com.yahoo.cloud.config.ClusterListConfig; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.container.core.documentapi.VespaDocumentAccess; +import com.yahoo.container.jdisc.ContentChannelOutputStream; +import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.document.fieldset.AllFields; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; -import com.yahoo.document.restapi.DocumentOperationExecutor; -import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType; -import com.yahoo.document.restapi.DocumentOperationExecutor.Group; -import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext; -import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext; -import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; -import com.yahoo.document.restapi.DocumentOperationExecutorImpl; +import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AsyncParameters; +import com.yahoo.documentapi.AsyncSession; +import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentOperationParameters; +import com.yahoo.documentapi.DocumentResponse; +import com.yahoo.documentapi.DumpVisitorDataHandler; +import com.yahoo.documentapi.ProgressToken; +import com.yahoo.documentapi.Result; +import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorParameters; +import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.Response; import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.BufferedContentChannel; import com.yahoo.jdisc.handler.CompletionHandler; import com.yahoo.jdisc.handler.ContentChannel; import com.yahoo.jdisc.handler.ReadableContentChannel; import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; -import com.yahoo.container.core.HandlerMetricContextUtil; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.metrics.simple.MetricReceiver; import com.yahoo.restapi.Path; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Inspector; -import com.yahoo.slime.Slime; -import com.yahoo.slime.SlimeUtils; +import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.yolean.Exceptions; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.time.Clock; +import java.time.Duration; import java.time.Instant; import java.util.Collection; import java.util.Collections; +import java.util.Deque; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.StringJoiner; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.logging.Logger; +import java.util.stream.Stream; import static com.yahoo.documentapi.DocumentOperationParameters.parameters; import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE; @@ -71,9 +97,11 @@ import static java.util.Objects.requireNonNull; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toUnmodifiableMap; /** - * Asynchronous HTTP handler for /document/v1/ + * Asynchronous HTTP handler for /document/v1 * * @author jonmv */ @@ -86,7 +114,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final CompletionHandler logException = new CompletionHandler() { @Override public void completed() { } @Override public void failed(Throwable t) { - log.log(FINE, () -> "Exception writing or closing response data: " + Exceptions.toMessageString(t)); + log.log(FINE, "Exception writing or closing response data", t); } }; @@ -95,23 +123,35 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void close(CompletionHandler handler) { handler.completed(); } }; + private static final JsonFactory jsonFactory = new JsonFactory(); + + private static final Duration requestTimeout = Duration.ofSeconds(175); + private static final Duration visitTimeout = Duration.ofSeconds(120); + private static final String CREATE = "create"; private static final String CONDITION = "condition"; - private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get + private static final String ROUTE = "route"; private static final String FIELD_SET = "fieldSet"; private static final String SELECTION = "selection"; - private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get + private static final String CLUSTER = "cluster"; private static final String CONTINUATION = "continuation"; private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount"; private static final String CONCURRENCY = "concurrency"; private static final String BUCKET_SPACE = "bucketSpace"; private final Clock clock; - private final Metric metric; // TODO jonmv: make response class which logs on completion/error + private final Metric metric; private final DocumentApiMetrics metrics; - private final DocumentOperationExecutor executor; private final DocumentOperationParser parser; - private final Map<String, Map<Method, Handler>> handlers; + private final long maxThrottled; + private final DocumentAccess access; + private final AsyncSession asyncSession; + private final Map<String, StorageCluster> clusters; + private final Deque<Operation> operations; + private final AtomicLong enqueued = new AtomicLong(); + private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject public DocumentV1ApiHandler(Metric metric, @@ -122,24 +162,34 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { AllClustersBucketSpacesConfig bucketSpacesConfig, DocumentOperationExecutorConfig executorConfig) { this(Clock.systemUTC(), - new DocumentOperationExecutorImpl(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, Clock.systemUTC()), new DocumentOperationParser(documentManagerConfig), metric, - metricReceiver); + metricReceiver, + executorConfig.maxThrottled(), + documentAccess, + parseClusters(clusterListConfig, bucketSpacesConfig)); } - DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser, - Metric metric, MetricReceiver metricReceiver) { + DocumentV1ApiHandler(Clock clock, DocumentOperationParser parser, Metric metric, MetricReceiver metricReceiver, + int maxThrottled, DocumentAccess access, Map<String, StorageCluster> clusters) { this.clock = clock; - this.executor = executor; this.parser = parser; this.metric = metric; this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1"); - this.handlers = defineApi(); + this.maxThrottled = maxThrottled; + this.access = access; + this.asyncSession = access.createAsyncSession(new AsyncParameters()); + this.clusters = clusters; + this.operations = new ConcurrentLinkedDeque<>(); + this.executor.scheduleWithFixedDelay(this::dispatchEnqueued, 10, 10, TimeUnit.MILLISECONDS); // TODO jonmv: make testable. } + // ------------------------------------------------ Requests ------------------------------------------------- + @Override public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) { + rawRequest.setTimeout(requestTimeout.toMillis(), TimeUnit.MILLISECONDS); + HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass()); ResponseHandler responseHandler = response -> { HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass()); @@ -147,34 +197,52 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }; HttpRequest request = (HttpRequest) rawRequest; - try { - Path requestPath = new Path(request.getUri()); - for (String path : handlers.keySet()) - if (requestPath.matches(path)) { - Map<Method, Handler> methods = handlers.get(path); - if (methods.containsKey(request.getMethod())) - return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler); - - if (request.getMethod() == OPTIONS) - return options(methods.keySet(), responseHandler); + try { + Path requestPath = new Path(request.getUri()); + for (String path : handlers.keySet()) + if (requestPath.matches(path)) { + Map<Method, Handler> methods = handlers.get(path); + if (methods.containsKey(request.getMethod())) + return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler); + + if (request.getMethod() == OPTIONS) + options(methods.keySet(), responseHandler); + + methodNotAllowed(request, methods.keySet(), responseHandler); + } + notFound(request, handlers.keySet(), responseHandler); + } + catch (IllegalArgumentException e) { + badRequest(request, e, responseHandler); + } + catch (RuntimeException e) { + serverError(request, e, responseHandler); + } + return ignoredContent; + } - return methodNotAllowed(request, methods.keySet(), responseHandler); - } - return notFound(request, handlers.keySet(), responseHandler); - } - catch (IllegalArgumentException e) { - return badRequest(request, e, responseHandler); - } - catch (RuntimeException e) { - return serverError(request, e, responseHandler); - } + @Override + public void handleTimeout(Request request, ResponseHandler responseHandler) { + timeout((HttpRequest) request, "Request timeout after " + requestTimeout, responseHandler); } @Override public void destroy() { - this.executor.shutdown(); + executor.shutdown(); + visits.values().forEach(VisitorSession::destroy); + try { + if ( ! executor.awaitTermination(10, TimeUnit.SECONDS)) { + executor.shutdownNow(); + if ( ! executor.awaitTermination(10, TimeUnit.SECONDS)) + log.log(WARNING, "Failed shutting down /document/v1 executor within 20 seconds"); + } + } + catch (InterruptedException e) { + log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); + } } + /** Defines all paths/methods handled by this handler. */ private Map<String, Map<Method, Handler>> defineApi() { Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); @@ -212,343 +280,354 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) { - Cursor root = responseRoot(request); - executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler)); + enqueue(request, handler, () -> { + VisitorOptions options = parseOptions(request, path).build(); + return () -> { + visit(request, options, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); return ignoredContent; } private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) { - Cursor root = responseRoot(request); - VisitorOptions.Builder options = parseOptions(request, path); - options = options.documentType(path.documentType()); - options = options.namespace(path.namespace()); - options = path.group().map(options::group).orElse(options); - executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler)); + enqueue(request, handler, () -> { + VisitorOptions.Builder optionsBuilder = parseOptions(request, path); + optionsBuilder = optionsBuilder.documentType(path.documentType()); + optionsBuilder = optionsBuilder.namespace(path.namespace()); + optionsBuilder = path.group().map(optionsBuilder::group).orElse(optionsBuilder); + VisitorOptions options = optionsBuilder.build(); + return () -> { + visit(request, options, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); return ignoredContent; } - private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) { - Object monitor = new Object(); - return new VisitOperationsContext((type, message) -> { - synchronized (monitor) { - handleError(request, type, message, root, handler); - } - }, - token -> { - token.ifPresent(value -> root.setString("continuation", value)); - synchronized (monitor) { - respond(root, handler); - } - }, - // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway - document -> { - try { - synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization. - SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(), - documents.addObject()); - } - } - // TODO jonmv: This shouldn't happen much, but ... expose errors too? - catch (RuntimeException e) { - log.log(WARNING, "Exception serializing document in document/v1 visit response", e); - } - }); - } - private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) { - DocumentId id = path.id(); - DocumentOperationParameters parameters = parameters(); - parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters); - parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters); - executor.get(id, - parameters, - new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), - document -> { - try { - Cursor root = responseRoot(request, id); - document.map(JsonWriter::toByteArray) - .map(SlimeUtils::jsonToSlime) - .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields"))); - respond(document.isPresent() ? 200 : 404, - root, - handler); - } - catch (Exception e) { - serverError(request, new RuntimeException(e), handler); - } - })); + enqueue(request, handler, () -> { + DocumentOperationParameters rawParameters = parameters(); + rawParameters = getProperty(request, CLUSTER).map(cluster -> resolveCluster(Optional.of(cluster), clusters).route()) + .map(rawParameters::withRoute) + .orElse(rawParameters); + rawParameters = getProperty(request, FIELD_SET).map(rawParameters::withFieldSet) + .orElse(rawParameters); + DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> { + handle(path, handler, response, (document, jsonResponse) -> { + if (document != null) { + jsonResponse.writeSingleDocument(document); + jsonResponse.commit(Response.Status.OK); + } + else + jsonResponse.commit(Response.Status.NOT_FOUND); + }); + }); + return () -> dispatchOperation(request, handler, () -> asyncSession.get(path.id(), parameters)); + }); return ignoredContent; } private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { - DocumentId id = path.id(); ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); return new ForwardingContentChannel(in -> { - try { - DocumentPut put = parser.parsePut(in, id.toString()); + enqueue(request, handler, () -> { + DocumentPut put = parser.parsePut(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); - executor.put(put, - getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), - new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), - __ -> respond(responseRoot(request, id), handler))); - } - catch (IllegalArgumentException e) { - badRequest(request, e, handler); - } - catch (RuntimeException e) { - serverError(request, e, handler); - } + DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) + .orElse(parameters()); + DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); + return () -> dispatchOperation(request, handler, () -> asyncSession.put(put, parameters)); + }); }); } private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { - DocumentId id = path.id(); - ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.UPDATE, clock.instant()); + ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.PUT, clock.instant()); return new ForwardingContentChannel(in -> { - try { - DocumentUpdate update = parser.parseUpdate(in, id.toString()); + enqueue(request, handler, () -> { + DocumentUpdate update = parser.parseUpdate(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent); - executor.update(update, - getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), - new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), - __ -> respond(responseRoot(request, id), handler))); - } - catch (IllegalArgumentException e) { - badRequest(request, e, handler); - } - catch (RuntimeException e) { - serverError(request, e, handler); - } + DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) + .orElse(parameters()); + DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); + return () -> dispatchOperation(request, handler, () -> asyncSession.update(update, parameters)); + }); }); } private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) { - DocumentId id = path.id(); ResponseHandler handler = new MeasuringResponseHandler(rawHandler, com.yahoo.documentapi.metrics.DocumentOperationType.REMOVE, clock.instant()); - executor.remove(id, - getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()), - new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler), - __ -> respond(responseRoot(request, id), handler))); + enqueue(request, handler, () -> { + DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) + .orElse(parameters()); + DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); + return () -> dispatchOperation(request, handler, () -> asyncSession.remove(path.id(), parameters)); + }); return ignoredContent; } - private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) { - switch (type) { - case BAD_REQUEST: - badRequest(request, message, root, handler); - break; - case NOT_FOUND: - notFound(request, message, root, handler); - break; - case PRECONDITION_FAILED: - preconditionFailed(request, message, root, handler); - break; - case OVERLOAD: - overload(request, message, root, handler); - break; - case TIMEOUT: - timeout(request, message, root, handler); - break; - case INSUFFICIENT_STORAGE: - insufficientStorage(request, message, root, handler); - break; - default: - log.log(WARNING, "Unexpected error type '" + type + "'"); - case ERROR: // intentional fallthrough - serverError(request, message, root, handler); + /** Dispatches enqueued requests until one is blocked. */ + private void dispatchEnqueued() { + try { + while (dispatch()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); } } - // ------------------------------------------------ Responses ------------------------------------------------ - - private static Cursor responseRoot(HttpRequest request) { - Cursor root = new Slime().setObject(); - root.setString("pathId", request.getUri().getRawPath()); - return root; - } - - private static Cursor responseRoot(HttpRequest request, DocumentId id) { - Cursor root = responseRoot(request); - root.setString("id", id.toString()); - return root; - } - - private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) { - Response response = new Response(Response.Status.NO_CONTENT); - response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(","))); - handler.handleResponse(response).close(logException); - return ignoredContent; - } - - private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) { - return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler); - } - - private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message); - root.setString("message", message); - return respond(Response.Status.BAD_REQUEST, root, handler); - } + /** Attempts to dispatch the first enqueued operations, and returns whether this was successful. */ + private boolean dispatch() { + Operation operation = operations.poll(); + if (operation == null) + return false; - private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) { - return notFound(request, - "Nothing at '" + request.getUri().getRawPath() + "'. " + - "Available paths are:\n" + String.join("\n", paths), - responseRoot(request), - handler); + if (operation.dispatch()) { + enqueued.decrementAndGet(); + return true; + } + operations.push(operation); + return false; } - private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - root.setString("message", message); - return respond(Response.Status.NOT_FOUND, root, handler); + /** Enqueues the given request and operation, or responds with "overload" if the queue is full. */ + private void enqueue(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) { + if (enqueued.incrementAndGet() > maxThrottled) { + enqueued.decrementAndGet(); + overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler); + return; + } + operations.offer(Operation.lazilyParsed(request, handler, operationParser)); } - private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) { - Cursor root = responseRoot(request); - root.setString("message", - "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " + - "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", "))); - return respond(Response.Status.METHOD_NOT_ALLOWED, - root, - handler); + @FunctionalInterface + interface Handler { + ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler); } - private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - root.setString("message", message); - return respond(Response.Status.PRECONDITION_FAILED, root, handler); - } + // ------------------------------------------------ Responses ------------------------------------------------ - private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); - root.setString("message", message); - return respond(Response.Status.TOO_MANY_REQUESTS, root, handler); - } + /** Class for writing and returning JSON responses to document operations in a thread safe manner. */ + private static class JsonResponse implements AutoCloseable { - private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) { - log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e); - Cursor root = responseRoot(request); - root.setString("message", Exceptions.toMessageString(e)); - return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler); - } + private final BufferedContentChannel buffer = new BufferedContentChannel(); + private final OutputStream out = new ContentChannelOutputStream(buffer); + private final JsonGenerator json = jsonFactory.createGenerator(out); + private final ResponseHandler handler; + private ContentChannel channel; - private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); - root.setString("message", message); - return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler); - } + private JsonResponse(ResponseHandler handler) throws IOException { + this.handler = handler; + json.writeStartObject(); + } - private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); - root.setString("message", message); - return respond(Response.Status.GATEWAY_TIMEOUT, root, handler); - } + /** Creates a new JsonResponse with path and id fields written. */ + static JsonResponse create(DocumentPath path, ResponseHandler handler) throws IOException { + JsonResponse response = new JsonResponse(handler); + response.writePathId(path.rawPath()); + response.writeDocId(path.id()); + return response; + } - private static ContentChannel insufficientStorage(HttpRequest request, String message, Cursor root, ResponseHandler handler) { - log.log(FINE, () -> "Insufficient storage for " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); - root.setString("message", message); - return respond(Response.Status.INSUFFICIENT_STORAGE, root, handler); - } + /** Creates a new JsonResponse with path field written. */ + static JsonResponse create(HttpRequest request, ResponseHandler handler) throws IOException { + JsonResponse response = new JsonResponse(handler); + response.writePathId(request.getUri().getRawPath()); + return response; + } - private static ContentChannel respond(Inspector root, ResponseHandler handler) { - return respond(200, root, handler); - } + /** Creates a new JsonResponse with path and message fields written. */ + static JsonResponse create(HttpRequest request, String message, ResponseHandler handler) throws IOException { + JsonResponse response = new JsonResponse(handler); + response.writePathId(request.getUri().getRawPath()); + response.writeMessage(message); + return response; + } - private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) { - Response response = new Response(status); - response.headers().put("Content-Type", "application/json; charset=UTF-8"); - ContentChannel out = null; - try { - out = handler.handleResponse(response); - out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException); + /** Commits a response with the given status code and some default headers, and writes whatever content is buffered. */ + synchronized void commit(int status) throws IOException { + Response response = new Response(status); + response.headers().addAll(Map.of("Content-Type", List.of("application/json; charset=UTF-8"))); + try { + channel = handler.handleResponse(response); + buffer.connectTo(channel); + } + catch (RuntimeException e) { + throw new IOException(e); + } } - catch (Exception e) { - log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e)); + + /** Commits a response with the given status code and some default headers, writes buffered content, and closes this. */ + synchronized void respond(int status) throws IOException { + try (this) { + commit(status); + } } - finally { - if (out != null) try { - out.close(logException); + + /** Closes the JSON and the output content channel of this. */ + @Override + public synchronized void close() throws IOException { + try { + if (channel == null) { + log.log(WARNING, "Close called before response was committed, in " + getClass().getName()); + commit(Response.Status.INTERNAL_SERVER_ERROR); + } + json.close(); // Also closes object and array scopes. + out.close(); // Simply flushes the output stream. } - catch (Exception e) { - log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e)); + finally { + if (channel != null) + channel.close(logException); // Closes the response handler's content channel. } } - return ignoredContent; - } - // ------------------------------------------------ Helpers ------------------------------------------------ + synchronized void writePathId(String path) throws IOException { + json.writeStringField("pathId", path); + } - private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) { - VisitorOptions.Builder options = VisitorOptions.builder(); + synchronized void writeMessage(String message) throws IOException { + json.writeStringField("message", message); + } - getProperty(request, SELECTION).ifPresent(options::selection); - getProperty(request, CONTINUATION).ifPresent(options::continuation); - getProperty(request, FIELD_SET).ifPresent(options::fieldSet); - getProperty(request, CLUSTER).ifPresent(options::cluster); - getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace); - getProperty(request, WANTED_DOCUMENT_COUNT, numberParser) - .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count))); - getProperty(request, CONCURRENCY, numberParser) - .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency))); + synchronized void writeDocId(DocumentId id) throws IOException { + json.writeStringField("id", id.toString()); + } - return options; - } + synchronized void writeSingleDocument(Document document) throws IOException { + new JsonWriter(json).writeFields(document); + } - static class DocumentPath { + synchronized void writeDocumentsArrayStart() throws IOException { + json.writeArrayFieldStart("documents"); + } - private final Path path; - private final Optional<Group> group; + synchronized void writeDocumentValue(Document document) throws IOException { + new JsonWriter(json).write(document); + } - DocumentPath(Path path) { - this.path = requireNonNull(path); - this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of) - .or(() -> Optional.ofNullable(path.get("group")).map(Group::of)); + synchronized void writeArrayEnd() throws IOException { + json.writeEndArray(); } - DocumentId id() { - return new DocumentId("id:" + requireNonNull(path.get("namespace")) + - ":" + requireNonNull(path.get("documentType")) + - ":" + group.map(Group::docIdPart).orElse("") + - ":" + requireNonNull(path.get("docid"))); + synchronized void writeContinuation(String token) throws IOException { + json.writeStringField("continuation", token); } - String documentType() { return requireNonNull(path.get("documentType")); } - String namespace() { return requireNonNull(path.get("namespace")); } - Optional<Group> group() { return group; } + } + private static void options(Collection<Method> methods, ResponseHandler handler) { + loggingException(() -> { + Response response = new Response(Response.Status.NO_CONTENT); + response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(","))); + handler.handleResponse(response).close(logException); + }); } - private static Optional<String> getProperty(HttpRequest request, String name) { - List<String> values = request.parameters().get(name); - if (values != null && values.size() != 0) - return Optional.ofNullable(values.get(values.size() - 1)); + private static void badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) { + loggingException(() -> { + String message = Exceptions.toMessageString(e); + log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message); + JsonResponse.create(request, message, handler).respond(Response.Status.BAD_REQUEST); + }); + } - return Optional.empty(); + private static void notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) { + loggingException(() -> { + JsonResponse.create(request, + "Nothing at '" + request.getUri().getRawPath() + "'. " + + "Available paths are:\n" + String.join("\n", paths), + handler) + .respond(Response.Status.NOT_FOUND); + }); } - private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) { - return getProperty(request, name).map(parser::parse); + private static void methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) { + loggingException(() -> { + JsonResponse.create(request, + "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " + + "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")), + handler) + .respond(Response.Status.METHOD_NOT_ALLOWED); + }); } + private static void overload(HttpRequest request, String message, ResponseHandler handler) { + loggingException(() -> { + log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); + JsonResponse.create(request, message, handler).respond(Response.Status.TOO_MANY_REQUESTS); + }); + } - @FunctionalInterface - interface Parser<T> extends Function<String, T> { - default T parse(String value) { - try { - return apply(value); - } - catch (RuntimeException e) { - throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e)); - } + private static void serverError(HttpRequest request, Throwable t, ResponseHandler handler) { + loggingException(() -> { + log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", t); + JsonResponse.create(request, Exceptions.toMessageString(t), handler).respond(Response.Status.INTERNAL_SERVER_ERROR); + }); + } + + private static void timeout(HttpRequest request, String message, ResponseHandler handler) { + loggingException(() -> { + log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message); + JsonResponse.create(request, message, handler).respond(Response.Status.GATEWAY_TIMEOUT); + }); + } + + private static void loggingException(Exceptions.RunnableThrowingIOException runnable) { + try { + runnable.run(); + } + catch (Exception e) { + log.log(FINE, "Failed writing response", e); } } + // ---------------------------------------------Document Operations ---------------------------------------- @FunctionalInterface - interface Handler { - ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler); + interface Operation { + + /** + * Attempts to dispatch this operation to the document API, and returns whether this completed or not. + * This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if + * dispatch should be retried at a later time. + */ + boolean dispatch(); + + /** Wraps the operation parser in an Operation that is parsed the first time it is attempted dispatched. */ + static Operation lazilyParsed(HttpRequest request, ResponseHandler handler, Supplier<Operation> parser) { + AtomicReference<Operation> operation = new AtomicReference<>(); + return () -> { + try { + operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch(); + } + catch (IllegalArgumentException e) { + badRequest(request, e, handler); + } + catch (RuntimeException e) { + serverError(request, e, handler); + } + return true; + }; + } + } + /** Attempts to send the given document operation, returning false if thes needs to be retried. */ + private static boolean dispatchOperation(HttpRequest request, ResponseHandler handler, Supplier<Result> documentOperation) { + if (request.isCancelled()) + return true; + + Result result = documentOperation.get(); + if (result.type() == Result.ResultType.TRANSIENT_ERROR) + return false; + + if (result.type() == Result.ResultType.FATAL_ERROR) + serverError(request, result.getError(), handler); + + return true; + } /** Readable content channel which forwards data to a reader when closed. */ static class ForwardingContentChannel implements ContentChannel { @@ -589,11 +668,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } - static class DocumentOperationParser { - private static final JsonFactory jsonFactory = new JsonFactory(); - private final DocumentTypeManager manager; DocumentOperationParser(DocumentmanagerConfig config) { @@ -614,6 +690,334 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } + interface SuccessCallback { + void onSuccess(Document document, JsonResponse response) throws IOException; + } + + private static void handle(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response, SuccessCallback callback) { + try (JsonResponse jsonResponse = JsonResponse.create(path, handler)) { + if (response.isSuccess()) + callback.onSuccess((response instanceof DocumentResponse) ? ((DocumentResponse) response).getDocument() : null, jsonResponse); + else { + jsonResponse.writeMessage(response.getTextMessage()); + switch (response.outcome()) { + case NOT_FOUND: + jsonResponse.commit(Response.Status.NOT_FOUND); + break; + case CONDITION_FAILED: + jsonResponse.commit(Response.Status.PRECONDITION_FAILED); + break; + case INSUFFICIENT_STORAGE: + log.log(WARNING, "Insufficient storage left in cluster: " + response.getTextMessage()); + jsonResponse.commit(Response.Status.INSUFFICIENT_STORAGE); + break; + default: + log.log(WARNING, "Unexpected document API operation outcome '" + response.outcome() + "'"); + case ERROR: + log.log(WARNING, "Exception performing document operation: " + response.getTextMessage()); + jsonResponse.commit(Response.Status.INTERNAL_SERVER_ERROR); + } + } + } + catch (Exception e) { + log.log(FINE, "Failed writing response", e); + } + } + + private static void handle(DocumentPath path, ResponseHandler handler, com.yahoo.documentapi.Response response) { + handle(path, handler, response, (document, jsonResponse) -> jsonResponse.commit(Response.Status.OK)); + } + + // ------------------------------------------------- Visits ------------------------------------------------ + + private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) { + VisitorOptions.Builder options = VisitorOptions.builder(); + + getProperty(request, SELECTION).ifPresent(options::selection); + getProperty(request, CONTINUATION).ifPresent(options::continuation); + getProperty(request, FIELD_SET).ifPresent(options::fieldSet); + getProperty(request, CLUSTER).ifPresent(options::cluster); + getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace); + getProperty(request, WANTED_DOCUMENT_COUNT, numberParser) + .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count))); + getProperty(request, CONCURRENCY, numberParser) + .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency))); + + return options; + } + + private static VisitorParameters asParameters(VisitorOptions options, Map<String, StorageCluster> clusters, Duration visitTimeout) { + if (options.cluster.isEmpty() && options.documentType.isEmpty()) + throw new IllegalArgumentException("Must set 'cluster' parameter to a valid content cluster id when visiting at a root /document/v1/ level"); + + VisitorParameters parameters = new VisitorParameters(Stream.of(options.selection, + options.documentType, + options.namespace.map(value -> "id.namespace=='" + value + "'"), + options.group.map(Group::selection)) + .flatMap(Optional::stream) + .reduce(new StringJoiner(") and (", "(", ")").setEmptyValue(""), // don't mind the lonely chicken to the right + StringJoiner::add, + StringJoiner::merge) + .toString()); + + options.continuation.map(ProgressToken::fromSerializedString).ifPresent(parameters::setResumeToken); + parameters.setFieldSet(options.fieldSet.orElse(options.documentType.map(type -> type + ":[document]").orElse(AllFields.NAME))); + options.wantedDocumentCount.ifPresent(count -> { if (count <= 0) throw new IllegalArgumentException("wantedDocumentCount must be positive"); }); + parameters.setMaxTotalHits(options.wantedDocumentCount.orElse(1 << 10)); + options.concurrency.ifPresent(value -> { if (value <= 0) throw new IllegalArgumentException("concurrency must be positive"); }); + parameters.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(options.concurrency.orElse(1))); + parameters.setTimeoutMs(visitTimeout.toMillis()); + parameters.visitInconsistentBuckets(true); + parameters.setPriority(DocumentProtocol.Priority.NORMAL_4); + + StorageCluster storageCluster = resolveCluster(options.cluster, clusters); + parameters.setRoute(storageCluster.route()); + parameters.setBucketSpace(resolveBucket(storageCluster, + options.documentType, + List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), + options.bucketSpace)); + + return parameters; + } + + private void visit(HttpRequest request, VisitorOptions options, ResponseHandler handler) { + try { + JsonResponse response = JsonResponse.create(request, handler); + response.writeDocumentsArrayStart(); + CountDownLatch latch = new CountDownLatch(1); + VisitorParameters parameters = asParameters(options, clusters, visitTimeout); + parameters.setLocalDataHandler(new DumpVisitorDataHandler() { + @Override public void onDocument(Document doc, long timeStamp) { + loggingException(() -> { + response.writeDocumentValue(doc); + }); + } + @Override public void onRemove(DocumentId id) { } // We don't visit removes. + }); + parameters.setControlHandler(new VisitorControlHandler() { + @Override public void onDone(CompletionCode code, String message) { + super.onDone(code, message); + loggingException(() -> { + response.writeArrayEnd(); // Close "documents" array. + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets()) { + response.writeMessage("No buckets visited within timeout of " + visitTimeout); + response.commit(Response.Status.GATEWAY_TIMEOUT); + break; + } + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (getProgress() != null && ! getProgress().isFinished()) + response.writeContinuation(getProgress().serializeToString()); + + response.commit(Response.Status.OK); + break; + default: + response.writeMessage(message != null ? message : "Visiting failed"); + response.commit(Response.Status.INTERNAL_SERVER_ERROR); + } + executor.execute(() -> { + try { + latch.await(); // We may get here while dispatching thread is still putting us in the map. + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + visits.get(this).destroy(); + }); + }); + } + }); + visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); + latch.countDown(); + } + catch (IllegalArgumentException e) { + badRequest(request, e, handler); + } + catch (ParseException e) { + badRequest(request, new IllegalArgumentException(e), handler); + } + catch (RuntimeException e) { + serverError(request, e, handler); + } + catch (Exception e) { + log.log(FINE, "Failed writing response", e); + } + } + + // TODO jonmv: Inline this class. + static class VisitorOptions { + + final Optional<String> cluster; + final Optional<String> namespace; + final Optional<String> documentType; + final Optional<Group> group; + final Optional<String> selection; + final Optional<String> fieldSet; + final Optional<String> continuation; + final Optional<String> bucketSpace; + final Optional<Integer> wantedDocumentCount; + final Optional<Integer> concurrency; + + private VisitorOptions(Optional<String> cluster, Optional<String> documentType, Optional<String> namespace, + Optional<Group> group, Optional<String> selection, Optional<String> fieldSet, + Optional<String> continuation, Optional<String> bucketSpace, + Optional<Integer> wantedDocumentCount, Optional<Integer> concurrency) { + this.cluster = cluster; + this.namespace = namespace; + this.documentType = documentType; + this.group = group; + this.selection = selection; + this.fieldSet = fieldSet; + this.continuation = continuation; + this.bucketSpace = bucketSpace; + this.wantedDocumentCount = wantedDocumentCount; + this.concurrency = concurrency; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VisitorOptions that = (VisitorOptions) o; + return cluster.equals(that.cluster) && + namespace.equals(that.namespace) && + documentType.equals(that.documentType) && + group.equals(that.group) && + selection.equals(that.selection) && + fieldSet.equals(that.fieldSet) && + continuation.equals(that.continuation) && + bucketSpace.equals(that.bucketSpace) && + wantedDocumentCount.equals(that.wantedDocumentCount) && + concurrency.equals(that.concurrency); + } + + @Override + public int hashCode() { + return Objects.hash(cluster, namespace, documentType, group, selection, fieldSet, continuation, bucketSpace, wantedDocumentCount, concurrency); + } + + @Override + public String toString() { + return "VisitorOptions{" + + "cluster=" + cluster + + ", namespace=" + namespace + + ", documentType=" + documentType + + ", group=" + group + + ", selection=" + selection + + ", fieldSet=" + fieldSet + + ", continuation=" + continuation + + ", bucketSpace=" + bucketSpace + + ", wantedDocumentCount=" + wantedDocumentCount + + ", concurrency=" + concurrency + + '}'; + } + + public static Builder builder() { return new Builder(); } + + + public static class Builder { + + private String cluster; + private String documentType; + private String namespace; + private Group group; + private String selection; + private String fieldSet; + private String continuation; + private String bucketSpace; + private Integer wantedDocumentCount; + private Integer concurrency; + + public Builder cluster(String cluster) { + this.cluster = cluster; + return this; + } + + public Builder documentType(String documentType) { + this.documentType = documentType; + return this; + } + + public Builder namespace(String namespace) { + this.namespace = namespace; + return this; + } + + public Builder group(Group group) { + this.group = group; + return this; + } + + public Builder selection(String selection) { + this.selection = selection; + return this; + } + + public Builder fieldSet(String fieldSet) { + this.fieldSet = fieldSet; + return this; + } + + public Builder continuation(String continuation) { + this.continuation = continuation; + return this; + } + + public Builder bucketSpace(String bucketSpace) { + this.bucketSpace = bucketSpace; + return this; + } + + public Builder wantedDocumentCount(Integer wantedDocumentCount) { + this.wantedDocumentCount = wantedDocumentCount; + return this; + } + + public Builder concurrency(Integer concurrency) { + this.concurrency = concurrency; + return this; + } + + public VisitorOptions build() { + return new VisitorOptions(Optional.ofNullable(cluster), Optional.ofNullable(documentType), + Optional.ofNullable(namespace), Optional.ofNullable(group), + Optional.ofNullable(selection), Optional.ofNullable(fieldSet), + Optional.ofNullable(continuation), Optional.ofNullable(bucketSpace), + Optional.ofNullable(wantedDocumentCount), Optional.ofNullable(concurrency)); + } + + } + + } + + // ------------------------------------------------ Helpers ------------------------------------------------ + + private static Optional<String> getProperty(HttpRequest request, String name) { + List<String> values = request.parameters().get(name); + if (values != null && values.size() != 0) + return Optional.ofNullable(values.get(values.size() - 1)); + + return Optional.empty(); + } + + private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) { + return getProperty(request, name).map(parser::parse); + } + + @FunctionalInterface + interface Parser<T> extends Function<String, T> { + default T parse(String value) { + try { + return apply(value); + } + catch (RuntimeException e) { + throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e)); + } + } + } + private class MeasuringResponseHandler implements ResponseHandler { private final ResponseHandler delegate; @@ -638,4 +1042,139 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } + static class StorageCluster { + + private final String name; + private final String configId; + private final Map<String, String> documentBuckets; + + StorageCluster(String name, String configId, Map<String, String> documentBuckets) { + this.name = requireNonNull(name); + this.configId = requireNonNull(configId); + this.documentBuckets = Map.copyOf(documentBuckets); + } + + String name() { return name; } + String configId() { return configId; } + String route() { return "[Storage:cluster=" + name() + ";clusterconfigid=" + configId() + "]"; } + Optional<String> bucketOf(String documentType) { return Optional.ofNullable(documentBuckets.get(documentType)); } + + } + + private static Map<String, StorageCluster> parseClusters(ClusterListConfig clusters, AllClustersBucketSpacesConfig buckets) { + return clusters.storage().stream() + .collect(toUnmodifiableMap(storage -> storage.name(), + storage -> new StorageCluster(storage.name(), + storage.configid(), + buckets.cluster(storage.name()) + .documentType().entrySet().stream() + .collect(toMap(entry -> entry.getKey(), + entry -> entry.getValue().bucketSpace()))))); + } + + static StorageCluster resolveCluster(Optional<String> wanted, Map<String, StorageCluster> clusters) { + if (clusters.isEmpty()) + throw new IllegalArgumentException("Your Vespa deployment has no content clusters, so the document API is not enabled"); + + return wanted.map(cluster -> { + if ( ! clusters.containsKey(cluster)) + throw new IllegalArgumentException("Your Vespa deployment has no content cluster '" + cluster + "', only '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.get(cluster); + }).orElseGet(() -> { + if (clusters.size() > 1) + throw new IllegalArgumentException("Please specify one of the content clusters in your Vespa deployment: '" + + String.join("', '", clusters.keySet()) + "'"); + + return clusters.values().iterator().next(); + }); + } + + private static String resolveBucket(StorageCluster cluster, Optional<String> documentType, + List<String> bucketSpaces, Optional<String> bucketSpace) { + return documentType.map(type -> cluster.bucketOf(type) + .orElseThrow(() -> new IllegalArgumentException("Document type '" + type + "' in cluster '" + cluster.name() + + "' is not mapped to a known bucket space"))) + .or(() -> bucketSpace.map(space -> { + if ( ! bucketSpaces.contains(space)) + throw new IllegalArgumentException("Bucket space '" + space + "' is not a known bucket space; expected one of " + + String.join(", ", bucketSpaces)); + return space; + })) + .orElse(FixedBucketSpaces.defaultSpace()); + } + + private static class DocumentPath { + + private final Path path; + private final Optional<Group> group; + + DocumentPath(Path path) { + this.path = requireNonNull(path); + this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of) + .or(() -> Optional.ofNullable(path.get("group")).map(Group::of)); + } + + DocumentId id() { + return new DocumentId("id:" + requireNonNull(path.get("namespace")) + + ":" + requireNonNull(path.get("documentType")) + + ":" + group.map(Group::docIdPart).orElse("") + + ":" + requireNonNull(path.get("docid"))); + } + + String rawPath() { return path.asString(); } + String documentType() { return requireNonNull(path.get("documentType")); } + String namespace() { return requireNonNull(path.get("namespace")); } + Optional<Group> group() { return group; } + + } + + static class Group { + + private final String value; + private final String docIdPart; + private final String selection; + + private Group(String value, String docIdPart, String selection) { + Text.validateTextString(value) + .ifPresent(codePoint -> { throw new IllegalArgumentException(String.format("Illegal code point U%04X in group", codePoint)); }); + this.value = value; + this.docIdPart = docIdPart; + this.selection = selection; + } + + public static Group of(long value) { return new Group(Long.toString(value), "n=" + value, "id.user==" + value); } + public static Group of(String value) { return new Group(value, "g=" + value, "id.group=='" + value.replaceAll("'", "\\'") + "'"); } + + public String value() { return value; } + public String docIdPart() { return docIdPart; } + public String selection() { return selection; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Group group = (Group) o; + return value.equals(group.value) && + docIdPart.equals(group.docIdPart) && + selection.equals(group.selection); + } + + @Override + public int hashCode() { + return Objects.hash(value, docIdPart, selection); + } + + @Override + public String toString() { + return "Group{" + + "value='" + value + '\'' + + ", docIdPart='" + docIdPart + '\'' + + ", selection='" + selection + '\'' + + '}'; + } + + } + } |