diff options
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 310 insertions, 35 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 48e5f37a4f8..a10d9802e14 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -20,6 +20,7 @@ import com.yahoo.document.FixedBucketSpaces; import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.document.fieldset.AllFields; +import com.yahoo.document.fieldset.DocIdOnly; import com.yahoo.document.idstring.IdIdString; import com.yahoo.document.json.DocumentOperationType; import com.yahoo.document.json.JsonReader; @@ -92,6 +93,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -152,6 +154,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private static final String BUCKET_SPACE = "bucketSpace"; private static final String TIMEOUT = "timeout"; private static final String TRACELEVEL = "tracelevel"; + private static final String DESTINATION = "destination"; private final Clock clock; private final Metric metric; @@ -162,9 +165,12 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final AsyncSession asyncSession; private final Map<String, StorageCluster> clusters; private final Deque<Operation> operations; + private final Deque<BooleanSupplier> visitOperations = new ConcurrentLinkedDeque<>(); private final AtomicLong enqueued = new AtomicLong(); + private final AtomicLong outstanding = new AtomicLong(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final ScheduledExecutorService visitDispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-visit-")); private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject @@ -195,6 +201,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { executorConfig.resendDelayMillis(), executorConfig.resendDelayMillis(), TimeUnit.MILLISECONDS); + this.visitDispatcher.scheduleWithFixedDelay(this::dispatchVisitEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -243,24 +253,44 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void destroy() { + Instant doom = clock.instant().plus(Duration.ofSeconds(30)); + + // This blocks until all visitors are done. These, in turn, may require the asyncSession to be alive + // to be able to run, as well as dispatch of operations against it, which is done by visitDispatcher. + visits.values().forEach(VisitorSession::destroy); + + // Shut down both dispatchers, so only we empty the queues of outstanding operations, and can be sure they're empty. dispatcher.shutdown(); - Instant doom = clock.instant().plus(Duration.ofSeconds(20)); - while ( ! operations.isEmpty() && clock.instant().isBefore(doom)) + visitDispatcher.shutdown(); + while ( ! (operations.isEmpty() && visitOperations.isEmpty()) && clock.instant().isBefore(doom)) { dispatchEnqueued(); + dispatchVisitEnqueued(); + } if ( ! operations.isEmpty()) log.log(WARNING, "Failed to empty request queue before shutdown timeout — " + operations.size() + " requests left"); - asyncSession.destroy(); - visits.values().forEach(VisitorSession::destroy); + if ( ! visitOperations.isEmpty()) + log.log(WARNING, "Failed to empty visitor operations queue before shutdown timeout — " + operations.size() + " operations left"); try { + while (outstanding.get() > 0 && clock.instant().isBefore(doom)) + Thread.sleep(Math.max(1, Duration.between(clock.instant(), doom).toMillis())); + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) dispatcher.shutdownNow(); + + if ( ! visitDispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + visitDispatcher.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); } + finally { + asyncSession.destroy(); + if (outstanding.get() != 0) + log.log(WARNING, "Failed to receive a response to " + outstanding.get() + " outstanding document operations during shutdown"); + } } @FunctionalInterface @@ -273,16 +303,27 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { Map<String, Map<Method, Handler>> handlers = new LinkedHashMap<>(); handlers.put("/document/v1/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/group/{group}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/number/{number}/", - Map.of(GET, this::getDocuments)); + Map.of(GET, this::getDocuments, + POST, this::postDocuments, + PUT, this::putDocuments, + DELETE, this::deleteDocuments)); handlers.put("/document/v1/{namespace}/{documentType}/docid/{*}", Map.of(GET, this::getDocument, @@ -316,12 +357,61 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return ignoredContent; } + private ContentChannel postDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + enqueueAndDispatch(request, handler, () -> { + VisitorParameters parameters = parseParameters(request, path); + parameters.setRemoteDataHandler(getProperty(request, DESTINATION).orElseThrow(() -> new IllegalArgumentException("Missing required property '" + DESTINATION + "'"))); + return () -> { + visitWithRemote(request, parameters, handler); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + + private ContentChannel putDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + return new ForwardingContentChannel(in -> { + enqueueAndDispatch(request, handler, () -> { + String type = path.documentType().orElseThrow(() -> new IllegalStateException("Document type must be specified for mass updates")); + IdIdString dummyId = new IdIdString("dummy", type, "", ""); + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + DocumentUpdate update = parser.parseUpdate(in, dummyId.toString()); + update.setCondition(new TestAndSetCondition(parameters.getDocumentSelection())); + return () -> { + visitAndUpdate(request, parameters, handler, update, getProperty(request, DESTINATION)); + return true; // VisitorSession has its own throttle handling. + }; + }); + }); + } + + private ContentChannel deleteDocuments(HttpRequest request, DocumentPath path, ResponseHandler handler) { + enqueueAndDispatch(request, handler, () -> { + if (getProperty(request, SELECTION).isEmpty()) + throw new IllegalArgumentException("Missing required property '" + SELECTION + "'"); + + VisitorParameters parameters = parseParameters(request, path); + parameters.setFieldSet(DocIdOnly.NAME); + TestAndSetCondition condition = new TestAndSetCondition(parameters.getDocumentSelection()); + return () -> { + visitAndDelete(request, parameters, handler, condition, getProperty(request, DESTINATION)); + return true; // VisitorSession has its own throttle handling. + }; + }); + return ignoredContent; + } + private ContentChannel getDocument(HttpRequest request, DocumentPath path, ResponseHandler handler) { enqueueAndDispatch(request, handler, () -> { DocumentOperationParameters rawParameters = parametersFromRequest(request, CLUSTER, FIELD_SET); if (rawParameters.fieldSet().isEmpty()) rawParameters = rawParameters.withFieldSet(path.documentType().orElseThrow() + ":[document]"); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> { + outstanding.decrementAndGet(); handle(path, handler, response, (document, jsonResponse) -> { if (document != null) { jsonResponse.writeSingleDocument(document); @@ -343,7 +433,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentPut put = parser.parsePut(in, path.id().toString()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(put::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); }); }); @@ -357,7 +450,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(update::setCondition); getProperty(request, CREATE, booleanParser).ifPresent(update::setCreateIfNonExistent); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); }); @@ -369,7 +465,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentRemove remove = new DocumentRemove(path.id()); getProperty(request, CONDITION).map(TestAndSetCondition::new).ifPresent(remove::setCondition); DocumentOperationParameters parameters = parametersFromRequest(request, ROUTE) - .withResponseHandler(response -> handle(path, handler, response)); + .withResponseHandler(response -> { + outstanding.decrementAndGet(); + handle(path, handler, response); + }); return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); return ignoredContent; @@ -422,6 +521,29 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { return false; } + /** Dispatches enqueued requests until one is blocked. */ + void dispatchVisitEnqueued() { + try { + while (dispatchFirstVisit()); + } + catch (Exception e) { + log.log(WARNING, "Uncaught exception in /document/v1 dispatch thread", e); + } + } + + /** Attempts to dispatch the first enqueued visit operations, and returns whether this was successful. */ + private boolean dispatchFirstVisit() { + BooleanSupplier operation = visitOperations.poll(); + if (operation == null) + return false; + + if (operation.getAsBoolean()) + return true; + + visitOperations.push(operation); + return false; + } + /** * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. @@ -684,7 +806,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { } /** Attempts to send the given document operation, returning false if this needs to be retried. */ - private static boolean dispatchOperation(Supplier<Result> documentOperation) { + private boolean dispatchOperation(Supplier<Result> documentOperation) { Result result = documentOperation.get(); if (result.type() == Result.ResultType.TRANSIENT_ERROR) return false; @@ -692,6 +814,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (result.type() == Result.ResultType.FATAL_ERROR) throw new RuntimeException(result.getError()); + outstanding.incrementAndGet(); return true; } @@ -846,6 +969,63 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { default void onEnd(JsonResponse response) throws IOException { } } + private void visitAndDelete(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + TestAndSetCondition condition, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentRemove remove = new DocumentRemove(id); + remove.setCondition(condition); + return asyncSession.remove(remove, operationParameters); + }); + } + + private void visitAndUpdate(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + DocumentUpdate protoUpdate, Optional<String> route) { + visitAndProcess(request, parameters, handler, route, (id, operationParameters) -> { + DocumentUpdate update = new DocumentUpdate(protoUpdate); + update.setId(id); + return asyncSession.update(update, operationParameters); + }); + } + + private void visitAndProcess(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, + Optional<String> route, BiFunction<DocumentId, DocumentOperationParameters, Result> operation) { + visit(request, parameters, handler, new VisitCallback() { + @Override public void onDocument(JsonResponse response, Document document, Runnable ack, Consumer<String> onError) { + DocumentOperationParameters operationParameters = (route.isEmpty() ? parameters() + : parameters().withRoute(route.get())) + .withResponseHandler(operationResponse -> { + outstanding.decrementAndGet(); + switch (operationResponse.outcome()) { + case SUCCESS: + case NOT_FOUND: + case CONDITION_FAILED: + break; // This is all OK — the latter two are due to mitigating races. + case ERROR: + case INSUFFICIENT_STORAGE: + onError.accept(operationResponse.getTextMessage()); + break; + default: + onError.accept("Unexpected response " + operationResponse); + } + }); + visitOperations.offer(() -> { + Result result = operation.apply(document.getId(), operationParameters); + if (result.type() == Result.ResultType.TRANSIENT_ERROR) + return false; + + if (result.type() == Result.ResultType.FATAL_ERROR) + onError.accept(result.getError().getMessage()); + else + outstanding.incrementAndGet(); + + ack.run(); + return true; + }); + dispatchFirstVisit(); + } + }); + } + private void visitAndWrite(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { visit(request, parameters, handler, new VisitCallback() { @Override public void onStart(JsonResponse response) throws IOException { @@ -861,6 +1041,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } + private void visitWithRemote(HttpRequest request, VisitorParameters parameters, ResponseHandler handler) { + visit(request, parameters, handler, new VisitCallback() { }); + } + private void visit(HttpRequest request, VisitorParameters parameters, ResponseHandler handler, VisitCallback callback) { try { JsonResponse response = JsonResponse.create(request, handler); @@ -893,7 +1077,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } - dispatcher.execute(() -> { + visitDispatcher.execute(() -> { phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index a2fbf3322fe..e5e5fef5fd0 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -27,6 +27,7 @@ import com.yahoo.documentapi.DocumentOperationParameters; import com.yahoo.documentapi.DocumentResponse; import com.yahoo.documentapi.ProgressToken; import com.yahoo.documentapi.Response; +import com.yahoo.documentapi.ResponseHandler; import com.yahoo.documentapi.Result; import com.yahoo.documentapi.SubscriptionParameters; import com.yahoo.documentapi.SubscriptionSession; @@ -69,6 +70,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -174,7 +176,7 @@ public class DocumentV1ApiTest { } @Test - public void testResponses() throws ExecutionException, InterruptedException { + public void testResponses() { RequestHandlerTestDriver driver = new RequestHandlerTestDriver(handler); List<AckToken> tokens = List.of(new AckToken(null), new AckToken(null), new AckToken(null)); // GET at non-existent path returns 404 with available paths @@ -250,6 +252,102 @@ public class DocumentV1ApiTest { "}", response.readAll()); assertEquals(400, response.getStatus()); + // POST with namespace and document type is a restricted visit with a required remote data handler ("destination") + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid", POST); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"Missing required property 'destination'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + + // POST with namespace and document type is a restricted visit with a require remote data handler ("destination") + access.expect(parameters -> { + assertEquals("zero", parameters.getRemoteDataHandler()); + assertEquals("music:[document]", parameters.fieldSet()); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destination=zero", POST); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + + // PUT with namespace and document type is a restricted visit with a required partial update to apply to visited documents. + access.expect(tokens.subList(2, 3)); + access.expect(parameters -> { + assertEquals("(true) and (music) and (id.namespace=='space')", parameters.getDocumentSelection()); + assertEquals("[id]", parameters.fieldSet()); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc3)), tokens.get(2)); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "Huzzah!"); + }); + access.session.expect((update, parameters) -> { + DocumentUpdate expectedUpdate = new DocumentUpdate(doc3.getDataType(), doc3.getId()); + expectedUpdate.addFieldUpdate(FieldUpdate.createAssign(doc3.getField("artist"), new StringFieldValue("Lisa Ekdahl"))); + expectedUpdate.setCondition(new TestAndSetCondition("(true) and (music) and (id.namespace=='space')")); + assertEquals(expectedUpdate, update); + parameters.responseHandler().get().handleResponse(new UpdateResponse(0, false)); + assertEquals(parameters().withRoute("zero"), parameters); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=true&destination=zero", PUT, + "{" + + " \"fields\": {" + + " \"artist\": { \"assign\": \"Lisa Ekdahl\" }" + + " }" + + "}"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"" + + "}", response.readAll()); + assertEquals(200, response.getStatus()); + + // PUT with namespace, document type and group is also a restricted visit which requires a selection. + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/group/troupe", PUT); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/group/troupe\"," + + " \"message\": \"Missing required property 'selection'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + + // DELETE with namespace and document type is a restricted visit which deletes visited documents. + access.expect(tokens.subList(0, 1)); + access.expect(parameters -> { + assertEquals("(false) and (music) and (id.namespace=='space')", parameters.getDocumentSelection()); + assertEquals("[id]", parameters.fieldSet()); + parameters.getLocalDataHandler().onMessage(new PutDocumentMessage(new DocumentPut(doc2)), tokens.get(0)); + parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.ABORTED, "Huzzah?"); + }); + access.session.expect((remove, parameters) -> { + DocumentRemove expectedRemove = new DocumentRemove(doc2.getId()); + expectedRemove.setCondition(new TestAndSetCondition("(false) and (music) and (id.namespace=='space')")); + assertEquals(new DocumentRemove(doc2.getId()), remove); + assertEquals(parameters().withRoute("zero"), parameters); + parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId(), "boom", Response.Outcome.ERROR)); + return new Result(Result.ResultType.SUCCESS, null); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid?selection=false&destination=zero", DELETE); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"boom\"" + + "}", response.readAll()); + assertEquals(500, response.getStatus()); + + // DELETE at the root is also a deletion visit. These require a selection. + access.expect(parameters -> { + fail("Not supposed to run"); + }); + response = driver.sendRequest("http://localhost/document/v1/space/music/docid", DELETE); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid\"," + + " \"message\": \"Missing required property 'selection'\"" + + "}", response.readAll()); + assertEquals(400, response.getStatus()); + // GET with namespace, document type and group is a restricted visit. access.expect(parameters -> { assertEquals("(music) and (id.namespace=='space') and (id.group=='best\\'')", parameters.getDocumentSelection()); @@ -423,7 +521,7 @@ public class DocumentV1ApiTest { DocumentRemove expectedRemove = new DocumentRemove(doc2.getId()); expectedRemove.setCondition(new TestAndSetCondition("false")); assertEquals(new DocumentRemove(doc2.getId()), remove); - assertEquals(parameters.withRoute("route"), parameters); + assertEquals(parameters().withRoute("route"), parameters); parameters.responseHandler().get().handleResponse(new DocumentIdResponse(0, doc2.getId())); return new Result(Result.ResultType.SUCCESS, null); }); @@ -517,27 +615,20 @@ public class DocumentV1ApiTest { "}", response2.readAll()); assertEquals(500, response2.getStatus()); - // Request timeout is dispatched after timeout has passed. - CountDownLatch latch = new CountDownLatch(1); - var assertions = Executors.newSingleThreadExecutor().submit(() -> { - access.session.expect((id, parameters) -> { - try { - latch.await(); - } - catch (InterruptedException e) { - fail("Not supposed to be interrupted"); - } - return new Result(Result.ResultType.SUCCESS, null); - }); - var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?cluster=content&fieldSet=go&timeout=1ms"); - assertSameJson("{" + - " \"pathId\": \"/document/v1/space/music/docid/one\"," + - " \"message\": \"Request timeout after 1ms\"" + - "}", response4.readAll()); - assertEquals(504, response4.getStatus()); + // Request response does not arrive before timeout has passed. + AtomicReference<ResponseHandler> handler = new AtomicReference<>(); + access.session.expect((id, parameters) -> { + handler.set(parameters.responseHandler().get()); + return new Result(Result.ResultType.SUCCESS, null); }); - latch.countDown(); - assertions.get(); + var response4 = driver.sendRequest("http://localhost/document/v1/space/music/docid/one?timeout=1ms"); + assertSameJson("{" + + " \"pathId\": \"/document/v1/space/music/docid/one\"," + + " \"message\": \"Request timeout after 1ms\"" + + "}", response4.readAll()); + assertEquals(504, response4.getStatus()); + if (handler.get() != null) // Timeout may have occurred before dispatch, or ... + handler.get().handleResponse(new Response(0)); // response may eventually arrive, but too late. driver.close(); } |