diff options
11 files changed, 449 insertions, 122 deletions
diff --git a/document/abi-spec.json b/document/abi-spec.json index 4a94d7e55de..1b3ca23c0bc 100644 --- a/document/abi-spec.json +++ b/document/abi-spec.json @@ -552,6 +552,7 @@ "methods": [ "public void <init>(com.yahoo.document.DocumentType, com.yahoo.document.DocumentId)", "public void <init>(com.yahoo.document.serialization.DocumentUpdateReader)", + "public void <init>(com.yahoo.document.DocumentUpdate)", "public void <init>(com.yahoo.document.DocumentType, java.lang.String)", "public com.yahoo.document.DocumentId getId()", "public void setId(com.yahoo.document.DocumentId)", @@ -3479,7 +3480,6 @@ "public" ], "methods": [ - "public static java.lang.String replaceType(java.lang.String, java.lang.String)", "public static long makeLocation(java.lang.String)", "public void <init>(java.lang.String, java.lang.String, java.lang.String, java.lang.String)", "public long getLocation()", diff --git a/document/src/main/java/com/yahoo/document/DocumentUpdate.java b/document/src/main/java/com/yahoo/document/DocumentUpdate.java index 8de8ca6af53..5c748f48f15 100644 --- a/document/src/main/java/com/yahoo/document/DocumentUpdate.java +++ b/document/src/main/java/com/yahoo/document/DocumentUpdate.java @@ -76,6 +76,16 @@ public class DocumentUpdate extends DocumentOperation implements Iterable<FieldP reader.read(this); } + /** Creates a new document update which is a copy of the argument. */ + public DocumentUpdate(DocumentUpdate update) { + super(update); + docId = update.docId; + documentType = update.documentType; + id2FieldUpdates = new HashMap<>(update.id2FieldUpdates); + fieldPathUpdates = new ArrayList<>(update.fieldPathUpdates); + createIfNonExistent = update.createIfNonExistent; + } + /** * Creates a DocumentUpdate. * diff --git a/document/src/main/java/com/yahoo/document/idstring/IdIdString.java b/document/src/main/java/com/yahoo/document/idstring/IdIdString.java index 9c75cf6828b..bb09dff7a98 100644 --- a/document/src/main/java/com/yahoo/document/idstring/IdIdString.java +++ b/document/src/main/java/com/yahoo/document/idstring/IdIdString.java @@ -5,10 +5,7 @@ import com.yahoo.collections.MD5; import com.yahoo.text.Utf8; /** - * Created with IntelliJ IDEA. - * User: magnarn - * Date: 10/15/12 - * Time: 11:02 AM + * @author Magnar Nedland */ public class IdIdString extends IdString { private final String type; @@ -19,20 +16,12 @@ public class IdIdString extends IdString { private static final int SIZE_OF_ID_AND_3_COLONS = 2 + 3; // "id:::" private static final int MAX_LENGTH = IdString.MAX_LENGTH_EXCEPT_NAMESPACE_SPECIFIC - SIZE_OF_ID_AND_3_COLONS; - public static String replaceType(String id, String typeName) { - int typeStartPos = id.indexOf(":", 3) + 1; - int typeEndPos = id.indexOf(":", typeStartPos); - return id.substring(0, typeStartPos) + typeName + id.substring(typeEndPos); - } - - public static long makeLocation(String s) { long result = 0; byte[] md5sum = MD5.md5.get().digest(Utf8.toBytes(s)); - for (int i=0; i<8; ++i) { - result |= (md5sum[i] & 0xFFl) << (8*i); + for (int i = 0; i < 8; ++i) { + result |= (md5sum[i] & 0xFFL) << (8 * i); } - return result; } diff --git a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java index a8fdb186bd7..d6d95ca0bc6 100644 --- a/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java +++ b/document/src/main/java/com/yahoo/document/json/document/DocumentParser.java @@ -33,7 +33,7 @@ public class DocumentParser { /** * Parses a single document and returns it. - * Returns empty is we have reached the end of the stream. + * Returns empty if we have reached the end of the stream. */ public Optional<DocumentParseInfo> parse(Optional<DocumentId> documentIdArg) throws IOException { indentLevel = 0; diff --git a/document/src/test/java/com/yahoo/document/IdIdStringTest.java b/document/src/test/java/com/yahoo/document/IdIdStringTest.java index 493fb35c97d..a4b05d0cf7a 100644 --- a/document/src/test/java/com/yahoo/document/IdIdStringTest.java +++ b/document/src/test/java/com/yahoo/document/IdIdStringTest.java @@ -109,9 +109,4 @@ public class IdIdStringTest { } } - @Test - public void requireThatIdIdStringCanReplaceType() { - String type = IdIdString.replaceType("id:namespace:type::foo", "newType"); - assertEquals("id:namespace:newType::foo", type); - } } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java b/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java index 963710d6c37..b24d1b33a02 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/AckToken.java @@ -4,7 +4,7 @@ package com.yahoo.documentapi; /** * Token to use to acknowledge data for visiting. * - * @author <a href="mailto:thomasg@yahoo-inc.com">Thomas Gundersen</a> + * @author Thomas Gundersen */ public class AckToken { diff --git a/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java b/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java index 0471c44cb14..cab591d2dc7 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/ResponseHandler.java @@ -12,5 +12,5 @@ public interface ResponseHandler { * * @param response The response to process. */ - public void handleResponse(Response response); + void handleResponse(Response response); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java index 0a4fb6fa0c8..0874ffb64fc 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/VisitorControlSession.java @@ -13,6 +13,7 @@ package com.yahoo.documentapi; * @author Håkon Humberset */ public interface VisitorControlSession { + /** * Acknowledges a response previously retrieved by the <code>getNext</code> * method. @@ -20,19 +21,19 @@ public interface VisitorControlSession { * @param token The ack token. You must get this from the visitor response * returned by the <code>getNext</code> method. */ - public void ack(AckToken token); + void ack(AckToken token); /** * Aborts the session. */ - public void abort(); + void abort(); /** * Returns the next response of this session. This method returns immediately. * * @return the next response, or null if no response is ready at this time */ - public VisitorResponse getNext(); + VisitorResponse getNext(); /** * Returns the next response of this session. This will block until a response is ready @@ -43,11 +44,11 @@ public interface VisitorControlSession { * @return the next response, or null if no response becomes ready before the timeout expires * @throws InterruptedException if this thread is interrupted while waiting */ - public VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException; + VisitorResponse getNext(int timeoutMilliseconds) throws InterruptedException; /** * Destroys this session and frees up any resources it has held. */ - public void destroy(); + void destroy(); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java index 982a1c50b85..675f20e3807 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/MessageBusVisitorSession.java @@ -772,8 +772,8 @@ public class MessageBusVisitorSession implements VisitorSession { } private void handleMessageProcessingException(Reply reply, Exception e, String what) { - final String errorDesc = formatProcessingException(e, what); - final String fullMsg = formatIdentifyingVisitorErrorString(errorDesc); + String errorDesc = formatProcessingException(e, what); + String fullMsg = formatIdentifyingVisitorErrorString(errorDesc); log.log(Level.SEVERE, fullMsg, e); int errorCode; synchronized (progress.getToken()) { diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 1fefe2e0c7e..4b49c5cdafb 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,23 +20,27 @@ import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.fieldset.DocIdOnly; +import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; import com.yahoo.document.json.JsonWriter; import com.yahoo.document.restapi.DocumentOperationExecutorConfig; import com.yahoo.document.select.parser.ParseException; +import com.yahoo.documentapi.AckToken; import com.yahoo.documentapi.AsyncParameters; import com.yahoo.documentapi.AsyncSession; import com.yahoo.documentapi.DocumentAccess; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.VisitorControlHandler; +import com.yahoo.documentapi.VisitorDataHandler; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorSession; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.metrics.DocumentApiMetrics; import com.yahoo.documentapi.metrics.DocumentOperationStatus; import com.yahoo.jdisc.Metric; @@ -51,6 +55,7 @@ import com.yahoo.jdisc.handler.ResponseHandler; import com.yahoo.jdisc.handler.UnsafeContentInputStream; import com.yahoo.jdisc.http.HttpRequest; import com.yahoo.jdisc.http.HttpRequest.Method; +import com.yahoo.messagebus.Message; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; import com.yahoo.messagebus.TraceNode; @@ -60,6 +65,7 @@ import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; import java.io.IOException; import java.io.InputStream; @@ -79,13 +85,16 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; +import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -145,6 +154,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String BUCKET_SPACE = "bucketSpace"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; + private static final String DESTINATION = "destination"; private final Clock clock; private final Metric metric; @@ -155,9 +165,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final AsyncSession asyncSession; private final Map<String, StorageCluster> clusters; private final Deque<Operation> operations; + private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>(); private final AtomicLong enqueued = new AtomicLong(); + private final AtomicLong outstanding = new AtomicLong(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject @@ -184,10 +197,14 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); this.operations = new ConcurrentLinkedDeque<>(); - this.executor.scheduleWithFixedDelay(this::dispatchEnqueued, - executorConfig.resendDelayMillis(), - executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); + this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -236,24 +253,44 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void destroy() { - executor.shutdown(); - Instant doom = clock.instant().plus(Duration.ofSeconds(20)); - while ( ! operations.isEmpty() && clock.instant().isBefore(doom)) + Instant doom = clock.instant().plus(Duration.ofSeconds(30)); + + // This blocks until all visitors are done. These, in turn, may require the asyncSession to be alive + // to be able to run, as well as dispatch of operations against it, which is done by visitDispatcher. + visits.values().forEach(VisitorSession::destroy); + + // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. + dispatcher.shutdown(); + visitDispatcher.shutdown(); + while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchEnqueued(); + dispatchVisitEnqueued(); + } if ( ! operations.isEmpty()) log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); - asyncSession.destroy(); - visits.values().forEach(VisitorSession::destroy); + if ( ! visitOperations.isEmpty()) + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); try { - if ( ! executor.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) - executor.shutdownNow(); + while (outstanding.get() > 0 && clock.instant().isBefore(doom)) + Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); + + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + dispatcher.shutdownNow(); + + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + visitDispatcher.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); } + finally { + asyncSession.destroy(); + if (outstanding.get() != 0) + log.log(WARNING, "Failed to receive a response to " + outstanding.get() + " outstanding document operations during shutdown"); + } } @FunctionalInterface @@ -266,16 +303,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); handlers.put("/document/v1/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/{*}", Map.of(GET, this::getDocument, @@ -302,7 +350,55 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { enqueueAndDispatch(request, handler, () -> { VisitorParameters parameters = parseParameters(request, path); return () -> { - visit(request, parameters, handler); + visitAndWrite(request, parameters, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + + private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + enqueueAndDispatch(request, handler, () -> { + VisitorParameters parameters = parseParameters(request, path); + parameters.setRemoteDataHandler(getProperty(request, DESTINATION).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + DESTINATION + "'"))); + return () -> { + visitWithRemote(request, parameters, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + + private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + return new ForwardingContentChannel(in -> { + enqueueAndDispatch(request, handler, () -> { + String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); + IdIdString dummyId = new IdIdString("dummy", type, "", ""); + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); + update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection())); + return () -> { + visitAndUpdate(request, parameters, handler, update, getProperty(request, DESTINATION)); + return true; // VisitorSession has its own throttle handling. + }; + }); + }); + } + + private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + enqueueAndDispatch(request, handler, () -> { + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection()); + return () -> { + visitAndDelete(request, parameters, handler, condition, getProperty(request, DESTINATION)); return true; // VisitorSession has its own throttle handling. }; }); @@ -315,6 +411,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (rawParameters.fieldSet().isEmpty()) rawParameters = rawParameters.withFieldSet(path.documentType().orElseThrow() + ":[document]"); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> { + outstanding.decrementAndGet(); handle(path, handler, response, (document, jsonResponse) -> { if (document != null) { jsonResponse.writeSingleDocument(document); @@ -336,7 +433,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentPut put = parser.parsePut(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); }); }); @@ -350,7 +450,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); }); @@ -362,7 +465,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentRemove remove = new DocumentRemove(path.id()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(remove::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); return ignoredContent; @@ -415,18 +521,41 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } + /** Dispatches enqueued requests until one is blocked. */ + void dispatchVisitEnqueued() { + try { + while (dispatchFirstVisit()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); + } + } + + /** Attempts to dispatch the first enqueued visit operations, and returns whether this was successful. */ + private boolean dispatchFirstVisit() { + BooleanSupplier operation = visitOperations.poll(); + if (operation == null) + return false; + + if (operation.getAsBoolean()) + return true; + + visitOperations.push(operation); + return false; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ - private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Supplier<Boolean>> operationParser) { + private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<BooleanSupplier> operationParser) { if (enqueued.incrementAndGet() > maxThrottled) { enqueued.decrementAndGet(); overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler); return; } operations.offer(new Operation(request, handler) { - @Override Supplier<Boolean> parse() { return operationParser.get(); } + @Override BooleanSupplier parse() { return operationParser.get(); } }); dispatchFirst(); } @@ -548,7 +677,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } - synchronized void writeDocumentValue(Document document) throws IOException { + synchronized void writeDocumentValue(Document document) { new JsonWriter(json).write(document); } @@ -619,7 +748,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private static void loggingException(Exceptions.RunnableThrowingIOException runnable) { + private static void loggingException(RunnableThrowingIOException runnable) { try { runnable.run(); } @@ -628,14 +757,14 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } } - // ---------------------------------------------Document Operations ---------------------------------------- + // -------------------------------------------- Document Operations ---------------------------------------- private static abstract class Operation { private final Lock lock = new ReentrantLock(); private final HttpRequest request; private final ResponseHandler handler; - private Supplier<Boolean> operation; + private BooleanSupplier operation; Operation(HttpRequest request, ResponseHandler handler) { this.request = request; @@ -644,7 +773,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { /** * Attempts to dispatch this operation to the document API, and returns whether this completed or not. - * This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if + * Returns {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if * dispatch should be retried at a later time. */ boolean dispatch() { @@ -658,7 +787,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (operation == null) operation = parse(); - return operation.get(); + return operation.getAsBoolean(); } catch (IllegalArgumentException e) { badRequest(request, e, handler); @@ -672,12 +801,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return true; } - abstract Supplier<Boolean> parse(); + abstract BooleanSupplier parse(); } - /** Attempts to send the given document operation, returning false if thes needs to be retried. */ - private static boolean dispatchOperation(Supplier<Result> documentOperation) { + /** Attempts to send the given document operation, returning false if this needs to be retried. */ + private boolean dispatchOperation(Supplier<Result> documentOperation) { Result result = documentOperation.get(); if (result.type() == Result.ResultType.TRANSIENT_ERROR) return false; @@ -685,6 +814,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (result.type() == Result.ResultType.FATAL_ERROR) throw new RuntimeException(result.getError()); + outstanding.incrementAndGet(); return true; } @@ -824,27 +954,108 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { path.documentType(), List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), getProperty(request, BUCKET_SPACE))); + return parameters; } - private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + private interface VisitCallback { + /** Called at the start of response rendering. */ + default void onStart(JsonResponse response) throws IOException { } + + /** Called for every document received from backend visitors — must call the ack for these to proceed. */ + default void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { } + + /** Called at the end of response rendering, before generic status data is written. */ + default void onEnd(JsonResponse response) throws IOException { } + } + + private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + TestAndSetCondition condition, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentRemove remove = new DocumentRemove(id); + remove.setCondition(condition); + return asyncSession.remove(remove, operationParameters); + }); + } + + private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + DocumentUpdate protoUpdate, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentUpdate update = new DocumentUpdate(protoUpdate); + update.setId(id); + return asyncSession.update(update, operationParameters); + }); + } + + private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters() + : parameters().withRoute(route.get())) + .withResponseHandler(operationResponse -> { + outstanding.decrementAndGet(); + switch (operationResponse.outcome()) { + case SUCCESS: + case NOT_FOUND: + case CONDITION_FAILED: + break; // This is all OK — the latter two are due to mitigating races. + case ERROR: + case INSUFFICIENT_STORAGE: + onError.accept(operationResponse.getTextMessage()); + break; + default: + onError.accept("Unexpected response " + operationResponse); + } + }); + visitOperations.offer(() -> { + Result result = operation.apply(document.getId(), operationParameters); + if (result.type() == Result.ResultType.TRANSIENT_ERROR) + return false; + + if (result.type() == Result.ResultType.FATAL_ERROR) + onError.accept(result.getError().getMessage()); + else + outstanding.incrementAndGet(); + + ack.run(); + return true; + }); + dispatchFirstVisit(); + } + }); + } + + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onStart(JsonResponse response) throws IOException { + response.writeDocumentsArrayStart(); + } + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + response.writeDocumentValue(document); + ack.run(); + } + @Override public void onEnd(JsonResponse response) throws IOException { + response.writeArrayEnd(); + } + }); + } + + private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { }); + } + + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); - response.writeDocumentsArrayStart(); - CountDownLatch latch = new CountDownLatch(1); - parameters.setLocalDataHandler(new DumpVisitorDataHandler() { - @Override public void onDocument(Document doc, long timeStamp) { - loggingException(() -> { - response.writeDocumentValue(doc); - }); - } - @Override public void onRemove(DocumentId id) { } // We don't visit removes. - }); - parameters.setControlHandler(new VisitorControlHandler() { + Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. + AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. + callback.onStart(response); + VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { super.onDone(code, message); loggingException(() -> { - response.writeArrayEnd(); // Close "documents" array. + callback.onEnd(response); switch (code) { case TIMEOUT: if ( ! hasVisitedAnyBuckets()) { @@ -855,29 +1066,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } case SUCCESS: // Intentional fallthrough. case ABORTED: // Intentional fallthrough. - if (getProgress() != null && ! getProgress().isFinished()) - response.writeContinuation(getProgress().serializeToString()); + if (error.get() == null) { + if (getProgress() != null && ! getProgress().isFinished()) + response.writeContinuation(getProgress().serializeToString()); - response.respond(Response.Status.OK); - break; + response.respond(Response.Status.OK); + break; + } default: - response.writeMessage(message != null ? message : "Visiting failed"); + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } - executor.execute(() -> { - try { - latch.await(); // We may get here while dispatching thread is still putting us in the map. - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + visitDispatcher.execute(() -> { + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); }); } - }); - visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); - latch.countDown(); + }; + if (parameters.getRemoteDataHandler() == null) { + parameters.setLocalDataHandler(new VisitorDataHandler() { + @Override public void onMessage(Message m, AckToken token) { + if (m instanceof PutDocumentMessage) + callback.onDocument(response, + ((PutDocumentMessage) m).getDocumentPut().getDocument(), + () -> ack(token), + errorMessage -> { + error.set(errorMessage); + controller.abort(); + }); + else + throw new UnsupportedOperationException("Only PutDocumentMessage is supported, but got a " + m.getClass()); + } + }); + } + parameters.setControlHandler(controller); + visits.put(controller, access.createVisitorSession(parameters)); + phaser.arriveAndDeregister(); } catch (ParseException e) { badRequest(request, new IllegalArgumentException(e), handler); @@ -889,6 +1114,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ------------------------------------------------ Helpers ------------------------------------------------ + /** Returns the last property with the given name, if present, or throws if this is empty or blank. */ private static Optional<String> getProperty(HttpRequest request, String name) { if ( ! request.parameters().containsKey(name)) return Optional.empty(); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 449daa4970a..e5e5fef5fd0 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -25,9 +25,9 @@ import com.yahoo.documentapi.DocumentAccessParams; import com.yahoo.documentapi.DocumentIdResponse; import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; -import com.yahoo.documentapi.DumpVisitorDataHandler; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.ResponseHandler; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SubscriptionParameters; import com.yahoo.documentapi.SubscriptionSession; @@ -40,6 +40,7 @@ import com.yahoo.documentapi.VisitorDestinationSession; import com.yahoo.documentapi.VisitorParameters; import com.yahoo.documentapi.VisitorResponse; import com.yahoo.documentapi.VisitorSession; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.jdisc.Metric; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.Trace; @@ -59,14 +60,17 @@ import org.junit.Test; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.OptionalInt; +import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -172,8 +176,9 @@ public class DocumentV1ApiTest { } @Test - public void testResponses() throws ExecutionException, InterruptedException { + public void testResponses() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); + List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths var response = driver.sendRequest("http://localhost/document/v1/not-found"); assertSameJson("{" + @@ -191,6 +196,7 @@ public class DocumentV1ApiTest { assertEquals(404, response.getStatus()); // GET at root is a visit. Numeric parameters have an upper bound. + access.expect(tokens); access.expect(parameters -> { assertEquals("[Content:cluster=content]", parameters.getRoute().toString()); assertEquals("default", parameters.getBucketSpace()); @@ -200,9 +206,9 @@ public class DocumentV1ApiTest { assertEquals("(all the things)", parameters.getDocumentSelection()); assertEquals(1000, parameters.getSessionTimeoutMs()); // Put some documents in the response - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc1, 0); - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc2, 0); - ((DumpVisitorDataHandler) parameters.getLocalDataHandler()).onDocument(doc3, 0); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc1)), tokens.get(0)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(1)); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); VisitorStatistics statistics = new VisitorStatistics(); statistics.setBucketsVisited(1); parameters.getControlHandler().onVisitorStatistics(statistics); @@ -246,6 +252,102 @@ public class DocumentV1ApiTest { "}", response.readAll()); assertEquals(400, response.getStatus()); + // POST with namespace and document type is a restricted visit with a required remote data handler ("destination") + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid", POST); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"Missing required property 'destination'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + + // POST with namespace and document type is a restricted visit with a require remote data handler ("destination") + access.expect(parameters -> { + assertEquals("zero", parameters.getRemoteDataHandler()); + assertEquals("music:[document]", parameters.fieldSet()); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destination=zero", POST); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + + // PUT with namespace and document type is a restricted visit with a required partial update to apply to visited documents. + access.expect(tokens.subList(2, 3)); + access.expect(parameters -> { + assertEquals("(true) and (music) and (id.namespace=='space')", parameters.getDocumentSelection()); + assertEquals("[id]", parameters.fieldSet()); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "Huzzah!"); + }); + access.session.expect((update, parameters) -> { + DocumentUpdate expectedUpdate = new DocumentUpdate(doc3.getDataType(), doc3.getId()); + expectedUpdate.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl"))); + expectedUpdate.setCondition(new TestAndSetCondition("(true) and (music) and (id.namespace=='space')")); + assertEquals(expectedUpdate, update); + parameters.responseHandler().get().handleResponse(new UpdateResponse(0, false)); + assertEquals(parameters().withRoute("zero"), parameters); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=true&destination=zero", PUT, + "{" + + " \"fields\": {" + + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + + " }" + + "}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + + // PUT with namespace, document type and group is also a restricted visit which requires a selection. + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/group/troupe", PUT); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/group/troupe\"," + + " \"message\": \"Missing required property 'selection'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + + // DELETE with namespace and document type is a restricted visit which deletes visited documents. + access.expect(tokens.subList(0, 1)); + access.expect(parameters -> { + assertEquals("(false) and (music) and (id.namespace=='space')", parameters.getDocumentSelection()); + assertEquals("[id]", parameters.fieldSet()); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(0)); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Huzzah?"); + }); + access.session.expect((remove, parameters) -> { + DocumentRemove expectedRemove = new DocumentRemove(doc2.getId()); + expectedRemove.setCondition(new TestAndSetCondition("(false) and (music) and (id.namespace=='space')")); + assertEquals(new DocumentRemove(doc2.getId()), remove); + assertEquals(parameters().withRoute("zero"), parameters); + parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId(), "boom", Response.Outcome.ERROR)); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=false&destination=zero", DELETE); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"boom\"" + + "}", response.readAll()); + assertEquals(500, response.getStatus()); + + // DELETE at the root is also a deletion visit. These require a selection. + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid", DELETE); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"Missing required property 'selection'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + // GET with namespace, document type and group is a restricted visit. access.expect(parameters -> { assertEquals("(music) and (id.namespace=='space') and (id.group=='best\\'')", parameters.getDocumentSelection()); @@ -419,7 +521,7 @@ public class DocumentV1ApiTest { DocumentRemove expectedRemove = new DocumentRemove(doc2.getId()); expectedRemove.setCondition(new TestAndSetCondition("false")); assertEquals(new DocumentRemove(doc2.getId()), remove); - assertEquals(parameters.withRoute("route"), parameters); + assertEquals(parameters().withRoute("route"), parameters); parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId())); return new Result(Result.ResultType.SUCCESS, null); }); @@ -513,27 +615,20 @@ public class DocumentV1ApiTest { "}", response2.readAll()); assertEquals(500, response2.getStatus()); - // Request timeout is dispatched after timeout has passed. - CountDownLatch latch = new CountDownLatch(1); - var assertions = Executors.newSingleThreadExecutor().submit(() -> { - access.session.expect((id, parameters) -> { - try { - latch.await(); - } - catch (InterruptedException e) { - fail("Not supposed to be interrupted"); - } - return new Result(Result.ResultType.SUCCESS, null); - }); - var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=content&fieldSet=go&timeout=1ms"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/docid/one\"," + - " \"message\": \"Request timeout after 1ms\"" + - "}", response4.readAll()); - assertEquals(504, response4.getStatus()); + // Request response does not arrive before timeout has passed. + AtomicReference<ResponseHandler> handler = new AtomicReference<>(); + access.session.expect((id, parameters) -> { + handler.set(parameters.responseHandler().get()); + return new Result(Result.ResultType.SUCCESS, null); }); - latch.countDown(); - assertions.get(); + var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"message\": \"Request timeout after 1ms\"" + + "}", response4.readAll()); + assertEquals(504, response4.getStatus()); + if (handler.get() != null) // Timeout may have occurred before dispatch, or ... + handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late. driver.close(); } @@ -542,6 +637,7 @@ public class DocumentV1ApiTest { static class MockDocumentAccess extends DocumentAccess { private final AtomicReference<Consumer<VisitorParameters>> expectations = new AtomicReference<>(); + private final Set<AckToken> outstanding = new CopyOnWriteArraySet<>(); private final MockAsyncSession session = new MockAsyncSession(); MockDocumentAccess(DocumentmanagerConfig config) { @@ -560,18 +656,24 @@ public class DocumentV1ApiTest { @Override public VisitorSession createVisitorSession(VisitorParameters parameters) { - expectations.get().accept(parameters); - return new VisitorSession() { + VisitorSession visitorSession = new VisitorSession() { + { + parameters.getControlHandler().setSession(this); + if (parameters.getLocalDataHandler() != null) + parameters.getLocalDataHandler().setSession(this); + } @Override public boolean isDone() { return false; } @Override public ProgressToken getProgress() { return null; } @Override public Trace getTrace() { return null; } @Override public boolean waitUntilDone(long timeoutMs) { return false; } - @Override public void ack(AckToken token) { } + @Override public void ack(AckToken token) { assertTrue(outstanding.remove(token)); } @Override public void abort() { } @Override public VisitorResponse getNext() { return null; } @Override public VisitorResponse getNext(int timeoutMilliseconds) { return null; } - @Override public void destroy() { } + @Override public void destroy() { assertEquals(Set.of(), outstanding); } }; + expectations.get().accept(parameters); + return visitorSession; } @Override @@ -593,6 +695,10 @@ public class DocumentV1ApiTest { this.expectations.set(expectations); } + public void expect(Collection<AckToken> tokens) { + outstanding.addAll(tokens); + } + } |