diff options
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 210 |
1 files changed, 197 insertions, 13 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 48e5f37a4f8..a10d9802e14 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,6 +20,7 @@ import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; @@ -92,6 +93,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -152,6 +154,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String BUCKET_SPACE = "bucketSpace"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; + private static final String DESTINATION = "destination"; private final Clock clock; private final Metric metric; @@ -162,9 +165,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final AsyncSession asyncSession; private final Map<String, StorageCluster> clusters; private final Deque<Operation> operations; + private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>(); private final AtomicLong enqueued = new AtomicLong(); + private final AtomicLong outstanding = new AtomicLong(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject @@ -195,6 +201,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); + this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -243,24 +253,44 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void destroy() { + Instant doom = clock.instant().plus(Duration.ofSeconds(30)); + + // This blocks until all visitors are done. These, in turn, may require the asyncSession to be alive + // to be able to run, as well as dispatch of operations against it, which is done by visitDispatcher. + visits.values().forEach(VisitorSession::destroy); + + // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. dispatcher.shutdown(); - Instant doom = clock.instant().plus(Duration.ofSeconds(20)); - while ( ! operations.isEmpty() && clock.instant().isBefore(doom)) + visitDispatcher.shutdown(); + while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchEnqueued(); + dispatchVisitEnqueued(); + } if ( ! operations.isEmpty()) log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); - asyncSession.destroy(); - visits.values().forEach(VisitorSession::destroy); + if ( ! visitOperations.isEmpty()) + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); try { + while (outstanding.get() > 0 && clock.instant().isBefore(doom)) + Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) dispatcher.shutdownNow(); + + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + visitDispatcher.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); } + finally { + asyncSession.destroy(); + if (outstanding.get() != 0) + log.log(WARNING, "Failed to receive a response to " + outstanding.get() + " outstanding document operations during shutdown"); + } } @FunctionalInterface @@ -273,16 +303,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); handlers.put("/document/v1/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/{*}", Map.of(GET, this::getDocument, @@ -316,12 +357,61 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return ignoredContent; } + private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + enqueueAndDispatch(request, handler, () -> { + VisitorParameters parameters = parseParameters(request, path); + parameters.setRemoteDataHandler(getProperty(request, DESTINATION).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + DESTINATION + "'"))); + return () -> { + visitWithRemote(request, parameters, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + + private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + return new ForwardingContentChannel(in -> { + enqueueAndDispatch(request, handler, () -> { + String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); + IdIdString dummyId = new IdIdString("dummy", type, "", ""); + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); + update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection())); + return () -> { + visitAndUpdate(request, parameters, handler, update, getProperty(request, DESTINATION)); + return true; // VisitorSession has its own throttle handling. + }; + }); + }); + } + + private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + enqueueAndDispatch(request, handler, () -> { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection()); + return () -> { + visitAndDelete(request, parameters, handler, condition, getProperty(request, DESTINATION)); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { DocumentOperationParameters rawParameters = parametersFromRequest(request, CLUSTER, FIELD_SET); if (rawParameters.fieldSet().isEmpty()) rawParameters = rawParameters.withFieldSet(path.documentType().orElseThrow() + ":[document]"); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> { + outstanding.decrementAndGet(); handle(path, handler, response, (document, jsonResponse) -> { if (document != null) { jsonResponse.writeSingleDocument(document); @@ -343,7 +433,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentPut put = parser.parsePut(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); }); }); @@ -357,7 +450,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); }); @@ -369,7 +465,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentRemove remove = new DocumentRemove(path.id()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(remove::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); return ignoredContent; @@ -422,6 +521,29 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } + /** Dispatches enqueued requests until one is blocked. */ + void dispatchVisitEnqueued() { + try { + while (dispatchFirstVisit()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); + } + } + + /** Attempts to dispatch the first enqueued visit operations, and returns whether this was successful. */ + private boolean dispatchFirstVisit() { + BooleanSupplier operation = visitOperations.poll(); + if (operation == null) + return false; + + if (operation.getAsBoolean()) + return true; + + visitOperations.push(operation); + return false; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. @@ -684,7 +806,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } /** Attempts to send the given document operation, returning false if this needs to be retried. */ - private static boolean dispatchOperation(Supplier<Result> documentOperation) { + private boolean dispatchOperation(Supplier<Result> documentOperation) { Result result = documentOperation.get(); if (result.type() == Result.ResultType.TRANSIENT_ERROR) return false; @@ -692,6 +814,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (result.type() == Result.ResultType.FATAL_ERROR) throw new RuntimeException(result.getError()); + outstanding.incrementAndGet(); return true; } @@ -846,6 +969,63 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { default void onEnd(JsonResponse response) throws IOException { } } + private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + TestAndSetCondition condition, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentRemove remove = new DocumentRemove(id); + remove.setCondition(condition); + return asyncSession.remove(remove, operationParameters); + }); + } + + private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + DocumentUpdate protoUpdate, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentUpdate update = new DocumentUpdate(protoUpdate); + update.setId(id); + return asyncSession.update(update, operationParameters); + }); + } + + private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters() + : parameters().withRoute(route.get())) + .withResponseHandler(operationResponse -> { + outstanding.decrementAndGet(); + switch (operationResponse.outcome()) { + case SUCCESS: + case NOT_FOUND: + case CONDITION_FAILED: + break; // This is all OK — the latter two are due to mitigating races. + case ERROR: + case INSUFFICIENT_STORAGE: + onError.accept(operationResponse.getTextMessage()); + break; + default: + onError.accept("Unexpected response " + operationResponse); + } + }); + visitOperations.offer(() -> { + Result result = operation.apply(document.getId(), operationParameters); + if (result.type() == Result.ResultType.TRANSIENT_ERROR) + return false; + + if (result.type() == Result.ResultType.FATAL_ERROR) + onError.accept(result.getError().getMessage()); + else + outstanding.incrementAndGet(); + + ack.run(); + return true; + }); + dispatchFirstVisit(); + } + }); + } + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { visit(request, parameters, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { @@ -861,6 +1041,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } + private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { }); + } + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); @@ -893,7 +1077,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } - dispatcher.execute(() -> { + visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); |