summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java603
1 files changed, 603 insertions, 0 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
new file mode 100644
index 00000000000..96ea6c08f86
--- /dev/null
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -0,0 +1,603 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.document.restapi.resource;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.google.inject.Inject;
+import com.yahoo.cloud.config.ClusterListConfig;
+import com.yahoo.container.core.documentapi.VespaDocumentAccess;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.document.json.JsonReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.json.document.DocumentParser;
+import com.yahoo.document.restapi.DocumentOperationExecutor;
+import com.yahoo.document.restapi.DocumentOperationExecutor.ErrorType;
+import com.yahoo.document.restapi.DocumentOperationExecutor.Group;
+import com.yahoo.document.restapi.DocumentOperationExecutor.OperationContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitOperationsContext;
+import com.yahoo.document.restapi.DocumentOperationExecutor.VisitorOptions;
+import com.yahoo.document.restapi.DocumentOperationExecutorConfig;
+import com.yahoo.documentapi.DocumentOperationParameters;
+import com.yahoo.documentapi.metrics.DocumentApiMetrics;
+import com.yahoo.documentapi.metrics.DocumentOperationStatus;
+import com.yahoo.documentapi.metrics.DocumentOperationType;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.CompletionHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ReadableContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.jdisc.handler.UnsafeContentInputStream;
+import com.yahoo.container.core.HandlerMetricContextUtil;
+import com.yahoo.jdisc.http.HttpRequest;
+import com.yahoo.jdisc.http.HttpRequest.Method;
+import com.yahoo.metrics.simple.MetricReceiver;
+import com.yahoo.restapi.Path;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Inspector;
+import com.yahoo.slime.Slime;
+import com.yahoo.slime.SlimeUtils;
+import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.logging.Logger;
+
+import static com.yahoo.documentapi.DocumentOperationParameters.parameters;
+import static com.yahoo.jdisc.http.HttpRequest.Method.DELETE;
+import static com.yahoo.jdisc.http.HttpRequest.Method.GET;
+import static com.yahoo.jdisc.http.HttpRequest.Method.OPTIONS;
+import static com.yahoo.jdisc.http.HttpRequest.Method.POST;
+import static com.yahoo.jdisc.http.HttpRequest.Method.PUT;
+import static java.util.Objects.requireNonNull;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.WARNING;
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Asynchronous HTTP handler for /document/v1/
+ *
+ * @author jonmv
+ */
+public class DocumentV1ApiHandler extends AbstractRequestHandler {
+
+ private static final Logger log = Logger.getLogger(DocumentV1ApiHandler.class.getName());
+ private static final Parser<Integer> numberParser = Integer::parseInt;
+ private static final Parser<Boolean> booleanParser = Boolean::parseBoolean;
+
+ private static final CompletionHandler logException = new CompletionHandler() {
+ @Override public void completed() { }
+ @Override public void failed(Throwable t) {
+ log.log(WARNING, "Exception writing response data", t);
+ }
+ };
+
+ private static final ContentChannel ignoredContent = new ContentChannel() {
+ @Override public void write(ByteBuffer buf, CompletionHandler handler) { handler.completed(); }
+ @Override public void close(CompletionHandler handler) { handler.completed(); }
+ };
+
+ private static final String CREATE = "create";
+ private static final String CONDITION = "condition";
+ private static final String ROUTE = "route"; // TODO jonmv: set for everything except Get
+ private static final String FIELD_SET = "fieldSet";
+ private static final String SELECTION = "selection";
+ private static final String CLUSTER = "cluster"; // TODO jonmv: set for Get
+ private static final String CONTINUATION = "continuation";
+ private static final String WANTED_DOCUMENT_COUNT = "wantedDocumentCount";
+ private static final String CONCURRENCY = "concurrency";
+ private static final String BUCKET_SPACE = "bucketSpace";
+
+ private final Clock clock;
+ private final Metric metric; // TODO jonmv: make response class which logs on completion/error
+ private final DocumentApiMetrics metrics;
+ private final DocumentOperationExecutor executor;
+ private final DocumentOperationParser parser;
+ private final Map<String, Map<Method, Handler>> handlers;
+
+ @Inject
+ public DocumentV1ApiHandler(Clock clock,
+ Metric metric,
+ MetricReceiver metricReceiver,
+ VespaDocumentAccess documentAccess,
+ DocumentmanagerConfig documentManagerConfig,
+ ClusterListConfig clusterListConfig,
+ AllClustersBucketSpacesConfig bucketSpacesConfig,
+ DocumentOperationExecutorConfig executorConfig) {
+ this(clock,
+ new DocumentOperationExecutor(clusterListConfig, bucketSpacesConfig, executorConfig, documentAccess, clock),
+ new DocumentOperationParser(documentManagerConfig),
+ metric,
+ metricReceiver);
+ }
+
+ DocumentV1ApiHandler(Clock clock, DocumentOperationExecutor executor, DocumentOperationParser parser,
+ Metric metric, MetricReceiver metricReceiver) {
+ this.clock = clock;
+ this.executor = executor;
+ this.parser = parser;
+ this.metric = metric;
+ this.metrics = new DocumentApiMetrics(metricReceiver, "documentV1");
+ this.handlers = defineApi();
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request rawRequest, ResponseHandler rawResponseHandler) {
+ HandlerMetricContextUtil.onHandle(rawRequest, metric, getClass());
+ ResponseHandler responseHandler = response -> {
+ HandlerMetricContextUtil.onHandled(rawRequest, metric, getClass());
+ return rawResponseHandler.handleResponse(response);
+ };
+
+ HttpRequest request = (HttpRequest) rawRequest;
+ try {
+ Path requestPath = new Path(request.getUri());
+ for (String path : handlers.keySet())
+ if (requestPath.matches(path)) {
+ Map<Method, Handler> methods = handlers.get(path);
+ if (methods.containsKey(request.getMethod()))
+ return methods.get(request.getMethod()).handle(request, new DocumentPath(requestPath), responseHandler);
+
+ if (request.getMethod() == OPTIONS)
+ return options(methods.keySet(), responseHandler);
+
+ return methodNotAllowed(request, methods.keySet(), responseHandler);
+ }
+ return notFound(request, handlers.keySet(), responseHandler);
+ }
+ catch (IllegalArgumentException e) {
+ return badRequest(request, e, responseHandler);
+ }
+ catch (RuntimeException e) {
+ return serverError(request, e, responseHandler);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ this.executor.shutdown();
+ }
+
+ private Map<String, Map<Method, Handler>> defineApi() {
+ Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
+
+ handlers.put("/document/v1/",
+ Map.of(GET, this::getRoot));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
+ Map.of(GET, this::getDocumentType));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/docid/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/{docid}",
+ Map.of(GET, this::getDocument,
+ POST, this::postDocument,
+ PUT, this::putDocument,
+ DELETE, this::deleteDocument));
+
+ return Collections.unmodifiableMap(handlers);
+ }
+
+ private ContentChannel getRoot(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ Cursor documents = root.setArray("documents");
+ executor.visit(parseOptions(request, path).build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private ContentChannel getDocumentType(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ VisitorOptions.Builder options = parseOptions(request, path);
+ options = options.documentType(path.documentType());
+ options = options.namespace(path.namespace());
+ options = path.group().map(options::group).orElse(options);
+ executor.visit(options.build(), visitorContext(request, root, root.setArray("documents"), handler));
+ return ignoredContent;
+ }
+
+ private static VisitOperationsContext visitorContext(HttpRequest request, Cursor root, Cursor documents, ResponseHandler handler) {
+ Object monitor = new Object();
+ return new VisitOperationsContext((type, message) -> {
+ synchronized (monitor) {
+ handleError(request, type, message, root, handler);
+ }
+ },
+ token -> {
+ token.ifPresent(value -> root.setString("continuation", value));
+ synchronized (monitor) {
+ respond(root, handler);
+ }
+ },
+ // TODO jonmv: make streaming — first doc indicates 200 OK anyway — unless session dies, which is a semi-200 anyway
+ document -> {
+ synchronized (monitor) { // Putting things into the slime is not thread safe, so need synchronization.
+ SlimeUtils.copyObject(SlimeUtils.jsonToSlime(JsonWriter.toByteArray(document)).get(),
+ documents.addObject());
+ }
+ });
+ }
+ private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ DocumentId id = path.id();
+ DocumentOperationParameters parameters = parameters();
+ parameters = getProperty(request, CLUSTER).map(executor::routeToCluster).map(parameters::withRoute).orElse(parameters);
+ parameters = getProperty(request, FIELD_SET).map(parameters::withFieldSet).orElse(parameters);
+ executor.get(id,
+ parameters,
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ document -> {
+ Cursor root = responseRoot(request, id);
+ document.map(JsonWriter::toByteArray)
+ .map(SlimeUtils::jsonToSlime)
+ .ifPresent(doc -> SlimeUtils.copyObject(doc.get().field("fields"), root.setObject("fields)")));
+ respond(document.isPresent() ? 200 : 404,
+ root,
+ handler);
+ }));
+ return ignoredContent;
+ }
+
+ private ContentChannel postDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.PUT, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentPut put = parser.parsePut(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
+ executor.put(put,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel putDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.UPDATE, clock.instant());
+ return new ForwardingContentChannel(in -> {
+ try {
+ DocumentUpdate update = parser.parseUpdate(in, id.toString());
+ getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
+ getProperty(request, CREATE).map(booleanParser::parse).ifPresent(update::setCreateIfNonExistent);
+ executor.update(update,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, Exceptions.toMessageString(e), responseRoot(request, id), handler);
+ }
+ });
+ }
+
+ private ContentChannel deleteDocument(HttpRequest request, DocumentPath path, ResponseHandler rawHandler) {
+ DocumentId id = path.id();
+ ResponseHandler handler = new MeasuringResponseHandler(rawHandler, DocumentOperationType.REMOVE, clock.instant());
+ executor.remove(id,
+ getProperty(request, ROUTE).map(parameters()::withRoute).orElse(parameters()),
+ new OperationContext((type, message) -> handleError(request, type, message, responseRoot(request, id), handler),
+ __ -> respond(responseRoot(request, id), handler)));
+ return ignoredContent;
+ }
+
+ private static void handleError(HttpRequest request, ErrorType type, String message, Cursor root, ResponseHandler handler) {
+ switch (type) {
+ case BAD_REQUEST:
+ badRequest(request, message, root, handler);
+ break;
+ case NOT_FOUND:
+ notFound(request, message, root, handler);
+ break;
+ case PRECONDITION_FAILED:
+ preconditionFailed(request, message, root, handler);
+ break;
+ case OVERLOAD:
+ overload(request, message, root, handler);
+ break;
+ case TIMEOUT:
+ timeout(request, message, root, handler);
+ break;
+ default:
+ log.log(WARNING, "Unexpected error type '" + type + "'");
+ case ERROR: // intentional fallthrough
+ serverError(request, message, root, handler);
+ }
+ }
+
+ // ------------------------------------------------ Responses ------------------------------------------------
+
+ private static Cursor responseRoot(HttpRequest request) {
+ Cursor root = new Slime().setObject();
+ root.setString("pathId", request.getUri().getRawPath());
+ return root;
+ }
+
+ private static Cursor responseRoot(HttpRequest request, DocumentId id) {
+ Cursor root = responseRoot(request);
+ root.setString("id", id.toString());
+ return root;
+ }
+
+ private static ContentChannel options(Collection<Method> methods, ResponseHandler handler) {
+ Response response = new Response(Response.Status.NO_CONTENT);
+ response.headers().add("Allow", methods.stream().sorted().map(Method::name).collect(joining(",")));
+ handler.handleResponse(response).close(logException);
+ return ignoredContent;
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, IllegalArgumentException e, ResponseHandler handler) {
+ return badRequest(request, Exceptions.toMessageString(e), responseRoot(request), handler);
+ }
+
+ private static ContentChannel badRequest(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Bad request for " + request.getMethod() + " at " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.BAD_REQUEST, root, handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, Collection<String> paths, ResponseHandler handler) {
+ return notFound(request,
+ "Nothing at '" + request.getUri().getRawPath() + "'. " +
+ "Available paths are:\n" + String.join("\n", paths),
+ responseRoot(request),
+ handler);
+ }
+
+ private static ContentChannel notFound(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.NOT_FOUND, root, handler);
+ }
+
+ private static ContentChannel methodNotAllowed(HttpRequest request, Collection<Method> methods, ResponseHandler handler) {
+ Cursor root = responseRoot(request);
+ root.setString("message",
+ "'" + request.getMethod() + "' not allowed at '" + request.getUri().getRawPath() + "'. " +
+ "Allowed methods are: " + methods.stream().sorted().map(Method::name).collect(joining(", ")));
+ return respond(Response.Status.METHOD_NOT_ALLOWED,
+ root,
+ handler);
+ }
+
+ private static ContentChannel preconditionFailed(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ root.setString("message", message);
+ return respond(Response.Status.PRECONDITION_FAILED, root, handler);
+ }
+
+ private static ContentChannel overload(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Overload handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.TOO_MANY_REQUESTS, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, RuntimeException e, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ":", e);
+ Cursor root = responseRoot(request);
+ root.setString("message", Exceptions.toMessageString(e));
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel serverError(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(WARNING, "Uncaught exception handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.INTERNAL_SERVER_ERROR, root, handler);
+ }
+
+ private static ContentChannel timeout(HttpRequest request, String message, Cursor root, ResponseHandler handler) {
+ log.log(FINE, () -> "Timeout handling request " + request.getMethod() + " " + request.getUri().getRawPath() + ": " + message);
+ root.setString("message", message);
+ return respond(Response.Status.GATEWAY_TIMEOUT, root, handler);
+ }
+
+ private static ContentChannel respond(Inspector root, ResponseHandler handler) {
+ return respond(200, root, handler);
+ }
+
+ private static ContentChannel respond(int status, Inspector root, ResponseHandler handler) {
+ Response response = new Response(status);
+ response.headers().put("Content-Type", "application/json; charset=UTF-8");
+ ContentChannel out = null;
+ try {
+ out = handler.handleResponse(new Response(status));
+ out.write(ByteBuffer.wrap(Exceptions.uncheck(() -> SlimeUtils.toJsonBytes(root))), logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems writing data to jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ finally {
+ if (out != null) try {
+ out.close(logException);
+ }
+ catch (Exception e) {
+ log.log(FINE, () -> "Problems closing jDisc content channel: " + Exceptions.toMessageString(e));
+ }
+ }
+ return ignoredContent;
+ }
+
+ // ------------------------------------------------ Helpers ------------------------------------------------
+
+ private VisitorOptions.Builder parseOptions(HttpRequest request, DocumentPath path) {
+ VisitorOptions.Builder options = VisitorOptions.builder();
+
+ getProperty(request, SELECTION).ifPresent(options::selection);
+ getProperty(request, CONTINUATION).ifPresent(options::continuation);
+ getProperty(request, FIELD_SET).ifPresent(options::fieldSet);
+ getProperty(request, CLUSTER).ifPresent(options::cluster);
+ getProperty(request, BUCKET_SPACE).ifPresent(options::bucketSpace);
+ getProperty(request, WANTED_DOCUMENT_COUNT, numberParser)
+ .ifPresent(count -> options.wantedDocumentCount(Math.min(1 << 10, count)));
+ getProperty(request, CONCURRENCY, numberParser)
+ .ifPresent(concurrency -> options.concurrency(Math.min(100, concurrency)));
+
+ return options;
+ }
+
+ static class DocumentPath {
+
+ private final Path path;
+ private final Optional<Group> group;
+
+ DocumentPath(Path path) {
+ this.path = requireNonNull(path);
+ this.group = Optional.ofNullable(path.get("number")).map(numberParser::parse).map(Group::of)
+ .or(() -> Optional.ofNullable(path.get("group")).map(Group::of));
+ }
+
+ DocumentId id() {
+ return new DocumentId("id:" + requireNonNull(path.get("namespace")) +
+ ":" + requireNonNull(path.get("documentType")) +
+ ":" + group.map(Group::docIdPart).orElse("") +
+ ":" + requireNonNull(path.get("docid")));
+ }
+
+ String documentType() { return requireNonNull(path.get("documentType")); }
+ String namespace() { return requireNonNull(path.get("namespace")); }
+ Optional<Group> group() { return group; }
+
+ }
+
+ private static Optional<String> getProperty(HttpRequest request, String name) {
+ List<String> values = request.parameters().get(name);
+ if (values != null && values.size() != 0)
+ return Optional.ofNullable(values.get(values.size() - 1));
+
+ return Optional.empty();
+ }
+
+ private static <T> Optional<T> getProperty(HttpRequest request, String name, Parser<T> parser) {
+ return getProperty(request, name).map(parser::parse);
+ }
+
+
+ @FunctionalInterface
+ interface Parser<T> extends Function<String, T> {
+ default T parse(String value) {
+ try {
+ return apply(value);
+ }
+ catch (RuntimeException e) {
+ throw new IllegalArgumentException("Failed parsing '" + value + "': " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
+
+ @FunctionalInterface
+ interface Handler {
+ ContentChannel handle(HttpRequest request, DocumentPath path, ResponseHandler handler);
+ }
+
+
+ /** Readable content channel which forwards data to a reader when closed. */
+ static class ForwardingContentChannel implements ContentChannel {
+
+ private final ReadableContentChannel delegate = new ReadableContentChannel();
+ private final Consumer<InputStream> reader;
+
+ public ForwardingContentChannel(Consumer<InputStream> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void write(ByteBuffer buf, CompletionHandler handler) {
+ delegate.write(buf, handler);
+ }
+
+ @Override
+ public void close(CompletionHandler handler) {
+ delegate.close(handler);
+ try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
+ reader.accept(in);
+ }
+ }
+
+ }
+
+
+ private static class DocumentOperationParser {
+
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ private final DocumentTypeManager manager;
+
+ DocumentOperationParser(DocumentmanagerConfig config) {
+ this.manager = new DocumentTypeManager(config);
+ }
+
+ DocumentPut parsePut(InputStream inputStream, String docId) {
+ return (DocumentPut) parse(inputStream, docId, DocumentParser.SupportedOperation.PUT);
+ }
+
+ DocumentUpdate parseUpdate(InputStream inputStream, String docId) {
+ return (DocumentUpdate) parse(inputStream, docId, DocumentParser.SupportedOperation.UPDATE);
+ }
+
+ private DocumentOperation parse(InputStream inputStream, String docId, DocumentParser.SupportedOperation operation) {
+ return new JsonReader(manager, inputStream, jsonFactory).readSingleDocument(operation, docId);
+ }
+
+ }
+
+ private class MeasuringResponseHandler implements ResponseHandler {
+
+ private final ResponseHandler delegate;
+ private final DocumentOperationType type;
+ private final Instant start;
+
+ private MeasuringResponseHandler(ResponseHandler delegate, DocumentOperationType type, Instant start) {
+ this.delegate = delegate;
+ this.type = type;
+ this.start = start;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ switch (response.getStatus() / 100) {
+ case 2: metrics.reportSuccessful(type, start); break;
+ case 4: metrics.reportFailure(type, DocumentOperationStatus.REQUEST_ERROR); break;
+ case 5: metrics.reportFailure(type, DocumentOperationStatus.SERVER_ERROR); break;
+ }
+ return delegate.handleResponse(response);
+ }
+
+ }
+
+}