diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-10-15 09:27:03 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-10-15 09:27:03 +0200 |
commit | 463729c529464ecea03388b9f2dcf94993d03a7f (patch) | |
tree | 8563f2a05dd1eeab0bfd937d0a61aa596ffb2978 /vespaclient-container-plugin | |
parent | f07e7cde693a73d99d6d3d27dc3aa65e44d1958b (diff) |
Fix operation dispatch (parse once, throw on concurrent dispatch, excpetion safe)
Diffstat (limited to 'vespaclient-container-plugin')
2 files changed, 46 insertions, 44 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 73c5be12697..1ef8694262d 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -80,9 +80,9 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -328,7 +328,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { jsonResponse.commit(Response.Status.NOT_FOUND); }); }); - return () -> dispatchOperation(request, handler, () -> asyncSession.get(path.id(), parameters)); + return () -> dispatchOperation(() -> asyncSession.get(path.id(), parameters)); }); return ignoredContent; } @@ -342,7 +342,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) .orElse(parameters()); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); - return () -> dispatchOperation(request, handler, () -> asyncSession.put(put, parameters)); + return () -> dispatchOperation(() -> asyncSession.put(put, parameters)); }); }); } @@ -357,7 +357,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) .orElse(parameters()); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); - return () -> dispatchOperation(request, handler, () -> asyncSession.update(update, parameters)); + return () -> dispatchOperation(() -> asyncSession.update(update, parameters)); }); }); } @@ -370,7 +370,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute) .orElse(parameters()); DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response)); - return () -> dispatchOperation(request, handler, () -> asyncSession.remove(remove, parameters)); + return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters)); }); return ignoredContent; } @@ -403,13 +403,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { * Enqueues the given request and operation, or responds with "overload" if the queue is full, * and then attempts to dispatch an enqueued operation from the head of the queue. */ - private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) { + private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Supplier<Boolean>> operationParser) { if (enqueued.incrementAndGet() > maxThrottled) { enqueued.decrementAndGet(); overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler); return; } - operations.offer(Operation.lazilyParsed(request, handler, operationParser)); + operations.offer(new Operation(request, handler) { + @Override Supplier<Boolean> parse() { return operationParser.get(); } + }); dispatchFirst(); } @@ -592,46 +594,57 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { // ---------------------------------------------Document Operations ---------------------------------------- - @FunctionalInterface - interface Operation { + private static abstract class Operation { + + private final Lock lock = new ReentrantLock(); + private final HttpRequest request; + private final ResponseHandler handler; + private Supplier<Boolean> operation; + + Operation(HttpRequest request, ResponseHandler handler) { + this.request = request; + this.handler = handler; + } /** * Attempts to dispatch this operation to the document API, and returns whether this completed or not. * This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if * dispatch should be retried at a later time. */ - boolean dispatch(); + boolean dispatch() { + if ( ! lock.tryLock()) + throw new IllegalStateException("Comcurrent attempts at dispatch — this is a bug"); - /** Wraps the operation parser in an Operation that is parsed the first time it is attempted dispatched. */ - static Operation lazilyParsed(HttpRequest request, ResponseHandler handler, Supplier<Operation> parser) { - AtomicReference<Operation> operation = new AtomicReference<>(); - return () -> { - try { - return operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch(); - } - catch (IllegalArgumentException e) { - badRequest(request, e, handler); - } - catch (RuntimeException e) { - serverError(request, e, handler); - } - return true; - }; + try { + if (operation == null) + operation = parse(); + + return operation.get(); + } + catch (IllegalArgumentException e) { + badRequest(request, e, handler); + } + catch (RuntimeException e) { + serverError(request, e, handler); + } + finally { + lock.unlock(); + } + return true; } + abstract Supplier<Boolean> parse(); + } /** Attempts to send the given document operation, returning false if thes needs to be retried. */ - private static boolean dispatchOperation(HttpRequest request, ResponseHandler handler, Supplier<Result> documentOperation) { - if (request.isCancelled()) - return true; - + private static boolean dispatchOperation(Supplier<Result> documentOperation) { Result result = documentOperation.get(); if (result.type() == Result.ResultType.TRANSIENT_ERROR) return false; if (result.type() == Result.ResultType.FATAL_ERROR) - serverError(request, result.getError(), handler); + throw new RuntimeException(result.getError()); return true; } @@ -641,7 +654,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final ReadableContentChannel delegate = new ReadableContentChannel(); private final Consumer<InputStream> reader; - private final AtomicBoolean written = new AtomicBoolean(); public ForwardingContentChannel(Consumer<InputStream> reader) { this.reader = reader; @@ -652,11 +664,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { public void write(ByteBuffer buf, CompletionHandler handler) { try { delegate.write(buf, logException); - written.set(true); handler.completed(); } catch (Exception e) { - log.log(WARNING, "Failed writing request data", e); handler.failed(e); } } @@ -665,8 +675,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void close(CompletionHandler handler) { try { - if ( ! written.get()) - throw new IllegalStateException("This content channel expects content to be written prior to close"); delegate.close(logException); try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) { reader.accept(in); @@ -674,7 +682,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { handler.completed(); } catch (Exception e) { - log.log(WARNING, "Failed closing request data", e); handler.failed(e); } } @@ -834,16 +841,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); latch.countDown(); } - catch (IllegalArgumentException e) { - badRequest(request, e, handler); - } catch (ParseException e) { badRequest(request, new IllegalArgumentException(e), handler); } - catch (RuntimeException e) { - serverError(request, e, handler); - } - catch (Exception e) { + catch (IOException e) { log.log(FINE, "Failed writing response", e); } } diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 74f49367e88..6516b983044 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -229,6 +229,7 @@ public class DocumentV1ApiTest { // GET with namespace and document type is a restricted visit. access.expect(parameters -> { assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection()); + assertEquals(new ProgressToken().serializeToString(), parameters.getResumeToken().serializeToString()); throw new IllegalArgumentException("parse failure"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString()); |