summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java210
1 files changed, 197 insertions, 13 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 48e5f37a4f8..a10d9802e14 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -20,6 +20,7 @@ import com.yahoo.document.FixedBucketSpaces;
import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.document.fieldset.AllFields;
+import com.yahoo.document.fieldset.DocIdOnly;
import com.yahoo.document.idstring.IdIdString;
import com.yahoo.document.json.DocumentOperationType;
import com.yahoo.document.json.JsonReader;
@@ -92,6 +93,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -152,6 +154,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private static final String BUCKET_SPACE = "bucketSpace";
private static final String TIMEOUT = "timeout";
private static final String TRACELEVEL = "tracelevel";
+ private static final String DESTINATION = "destination";
private final Clock clock;
private final Metric metric;
@@ -162,9 +165,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final AsyncSession asyncSession;
private final Map<String, StorageCluster> clusters;
private final Deque<Operation> operations;
+ private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>();
private final AtomicLong enqueued = new AtomicLong();
+ private final AtomicLong outstanding = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
+ private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
@@ -195,6 +201,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
executorConfig.resendDelayMillis(),
executorConfig.resendDelayMillis(),
TimeUnit.MILLISECONDS);
+ this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued,
+ executorConfig.resendDelayMillis(),
+ executorConfig.resendDelayMillis(),
+ TimeUnit.MILLISECONDS);
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -243,24 +253,44 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void destroy() {
+ Instant doom = clock.instant().plus(Duration.ofSeconds(30));
+
+ // This blocks until all visitors are done. These, in turn, may require the asyncSession to be alive
+ // to be able to run, as well as dispatch of operations against it, which is done by visitDispatcher.
+ visits.values().forEach(VisitorSession::destroy);
+
+ // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty.
dispatcher.shutdown();
- Instant doom = clock.instant().plus(Duration.ofSeconds(20));
- while ( ! operations.isEmpty() && clock.instant().isBefore(doom))
+ visitDispatcher.shutdown();
+ while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) {
dispatchEnqueued();
+ dispatchVisitEnqueued();
+ }
if ( ! operations.isEmpty())
log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left");
- asyncSession.destroy();
- visits.values().forEach(VisitorSession::destroy);
+ if ( ! visitOperations.isEmpty())
+ log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left");
try {
+ while (outstanding.get() > 0 && clock.instant().isBefore(doom))
+ Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis()));
+
if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
dispatcher.shutdownNow();
+
+ if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ visitDispatcher.shutdownNow();
}
catch (InterruptedException e) {
log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
}
+ finally {
+ asyncSession.destroy();
+ if (outstanding.get() != 0)
+ log.log(WARNING, "Failed to receive a response to " + outstanding.get() + " outstanding document operations during shutdown");
+ }
}
@FunctionalInterface
@@ -273,16 +303,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>();
handlers.put("/document/v1/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/",
- Map.of(GET, this::getDocuments));
+ Map.of(GET, this::getDocuments,
+ POST, this::postDocuments,
+ PUT, this::putDocuments,
+ DELETE, this::deleteDocuments));
handlers.put("/document/v1/{namespace}/{documentType}/docid/{*}",
Map.of(GET, this::getDocument,
@@ -316,12 +357,61 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return ignoredContent;
}
+ private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ enqueueAndDispatch(request, handler, () -> {
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setRemoteDataHandler(getProperty(request, DESTINATION).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + DESTINATION + "'")));
+ return () -> {
+ visitWithRemote(request, parameters, handler);
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ return ignoredContent;
+ }
+
+ private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ if (getProperty(request, SELECTION).isEmpty())
+ throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
+
+ return new ForwardingContentChannel(in -> {
+ enqueueAndDispatch(request, handler, () -> {
+ String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates"));
+ IdIdString dummyId = new IdIdString("dummy", type, "", "");
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setFieldSet(DocIdOnly.NAME);
+ DocumentUpdate update = parser.parseUpdate(in, dummyId.toString());
+ update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection()));
+ return () -> {
+ visitAndUpdate(request, parameters, handler, update, getProperty(request, DESTINATION));
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ });
+ }
+
+ private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) {
+ enqueueAndDispatch(request, handler, () -> {
+ if (getProperty(request, SELECTION).isEmpty())
+ throw new IllegalArgumentException("Missing required property '" + SELECTION + "'");
+
+ VisitorParameters parameters = parseParameters(request, path);
+ parameters.setFieldSet(DocIdOnly.NAME);
+ TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection());
+ return () -> {
+ visitAndDelete(request, parameters, handler, condition, getProperty(request, DESTINATION));
+ return true; // VisitorSession has its own throttle handling.
+ };
+ });
+ return ignoredContent;
+ }
+
private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) {
enqueueAndDispatch(request, handler, () -> {
DocumentOperationParameters rawParameters = parametersFromRequest(request, CLUSTER, FIELD_SET);
if (rawParameters.fieldSet().isEmpty())
rawParameters = rawParameters.withFieldSet(path.documentType().orElseThrow() + ":[document]");
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> {
+ outstanding.decrementAndGet();
handle(path, handler, response, (document, jsonResponse) -> {
if (document != null) {
jsonResponse.writeSingleDocument(document);
@@ -343,7 +433,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentPut put = parser.parsePut(in, path.id().toString());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.put(put, parameters));
});
});
@@ -357,7 +450,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition);
getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.update(update, parameters));
});
});
@@ -369,7 +465,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentRemove remove = new DocumentRemove(path.id());
getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(remove::setCondition);
DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE)
- .withResponseHandler(response -> handle(path, handler, response));
+ .withResponseHandler(response -> {
+ outstanding.decrementAndGet();
+ handle(path, handler, response);
+ });
return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters));
});
return ignoredContent;
@@ -422,6 +521,29 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
return false;
}
+ /** Dispatches enqueued requests until one is blocked. */
+ void dispatchVisitEnqueued() {
+ try {
+ while (dispatchFirstVisit());
+ }
+ catch (Exception e) {
+ log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e);
+ }
+ }
+
+ /** Attempts to dispatch the first enqueued visit operations, and returns whether this was successful. */
+ private boolean dispatchFirstVisit() {
+ BooleanSupplier operation = visitOperations.poll();
+ if (operation == null)
+ return false;
+
+ if (operation.getAsBoolean())
+ return true;
+
+ visitOperations.push(operation);
+ return false;
+ }
+
/**
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
@@ -684,7 +806,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
}
/** Attempts to send the given document operation, returning false if this needs to be retried. */
- private static boolean dispatchOperation(Supplier<Result> documentOperation) {
+ private boolean dispatchOperation(Supplier<Result> documentOperation) {
Result result = documentOperation.get();
if (result.type() == Result.ResultType.TRANSIENT_ERROR)
return false;
@@ -692,6 +814,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
if (result.type() == Result.ResultType.FATAL_ERROR)
throw new RuntimeException(result.getError());
+ outstanding.incrementAndGet();
return true;
}
@@ -846,6 +969,63 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
default void onEnd(JsonResponse response) throws IOException { }
}
+ private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ TestAndSetCondition condition, Optional<String> route) {
+ visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ DocumentRemove remove = new DocumentRemove(id);
+ remove.setCondition(condition);
+ return asyncSession.remove(remove, operationParameters);
+ });
+ }
+
+ private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ DocumentUpdate protoUpdate, Optional<String> route) {
+ visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> {
+ DocumentUpdate update = new DocumentUpdate(protoUpdate);
+ update.setId(id);
+ return asyncSession.update(update, operationParameters);
+ });
+ }
+
+ private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler,
+ Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) {
+ visit(request, parameters, handler, new VisitCallback() {
+ @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) {
+ DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters()
+ : parameters().withRoute(route.get()))
+ .withResponseHandler(operationResponse -> {
+ outstanding.decrementAndGet();
+ switch (operationResponse.outcome()) {
+ case SUCCESS:
+ case NOT_FOUND:
+ case CONDITION_FAILED:
+ break; // This is all OK — the latter two are due to mitigating races.
+ case ERROR:
+ case INSUFFICIENT_STORAGE:
+ onError.accept(operationResponse.getTextMessage());
+ break;
+ default:
+ onError.accept("Unexpected response " + operationResponse);
+ }
+ });
+ visitOperations.offer(() -> {
+ Result result = operation.apply(document.getId(), operationParameters);
+ if (result.type() == Result.ResultType.TRANSIENT_ERROR)
+ return false;
+
+ if (result.type() == Result.ResultType.FATAL_ERROR)
+ onError.accept(result.getError().getMessage());
+ else
+ outstanding.incrementAndGet();
+
+ ack.run();
+ return true;
+ });
+ dispatchFirstVisit();
+ }
+ });
+ }
+
private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
visit(request, parameters, handler, new VisitCallback() {
@Override public void onStart(JsonResponse response) throws IOException {
@@ -861,6 +1041,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
+ private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) {
+ visit(request, parameters, handler, new VisitCallback() { });
+ }
+
private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) {
try {
JsonResponse response = JsonResponse.create(request, handler);
@@ -893,7 +1077,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
- dispatcher.execute(() -> {
+ visitDispatcher.execute(() -> {
phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});