summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-10-15 09:27:03 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-10-15 09:27:03 +0200
commit463729c529464ecea03388b9f2dcf94993d03a7f (patch)
tree8563f2a05dd1eeab0bfd937d0a61aa596ffb2978 /vespaclient-container-plugin
parentf07e7cde693a73d99d6d3d27dc3aa65e44d1958b (diff)
Fix operation dispatch (parse once, throw on concurrent dispatch, excpetion safe)
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java89
-rw-r--r--vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java1
2 files changed, 46 insertions, 44 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 73c5be12697..1ef8694262d 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -80,9 +80,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -328,7 +328,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
jsonResponse.commit(Response.Status.NOT_FOUND);
});
});
- return () -> dispatchOperation(request, handler, () -> asyncSession.get(path.id(), parameters));
+ return () -> dispatchOperation(() -> asyncSession.get(path.id(), parameters));
});
return ignoredContent;
}
@@ -342,7 +342,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
.orElse(parameters());
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
- return () -> dispatchOperation(request, handler, () -> asyncSession.put(put, parameters));
+ return () -> dispatchOperation(() -> asyncSession.put(put, parameters));
});
});
}
@@ -357,7 +357,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
.orElse(parameters());
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
- return () -> dispatchOperation(request, handler, () -> asyncSession.update(update, parameters));
+ return () -> dispatchOperation(() -> asyncSession.update(update, parameters));
});
});
}
@@ -370,7 +370,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
DocumentOperationParameters rawParameters = getProperty(request, ROUTE).map(parameters()::withRoute)
.orElse(parameters());
DocumentOperationParameters parameters = rawParameters.withResponseHandler(response -> handle(path, handler, response));
- return () -> dispatchOperation(request, handler, () -> asyncSession.remove(remove, parameters));
+ return () -> dispatchOperation(() -> asyncSession.remove(remove, parameters));
});
return ignoredContent;
}
@@ -403,13 +403,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
* Enqueues the given request and operation, or responds with "overload" if the queue is full,
* and then attempts to dispatch an enqueued operation from the head of the queue.
*/
- private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Operation> operationParser) {
+ private void enqueueAndDispatch(HttpRequest request, ResponseHandler handler, Supplier<Supplier<Boolean>> operationParser) {
if (enqueued.incrementAndGet() > maxThrottled) {
enqueued.decrementAndGet();
overload(request, "Rejecting execution due to overload: " + maxThrottled + " requests already enqueued", handler);
return;
}
- operations.offer(Operation.lazilyParsed(request, handler, operationParser));
+ operations.offer(new Operation(request, handler) {
+ @Override Supplier<Boolean> parse() { return operationParser.get(); }
+ });
dispatchFirst();
}
@@ -592,46 +594,57 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
// ---------------------------------------------Document Operations ----------------------------------------
- @FunctionalInterface
- interface Operation {
+ private static abstract class Operation {
+
+ private final Lock lock = new ReentrantLock();
+ private final HttpRequest request;
+ private final ResponseHandler handler;
+ private Supplier<Boolean> operation;
+
+ Operation(HttpRequest request, ResponseHandler handler) {
+ this.request = request;
+ this.handler = handler;
+ }
/**
* Attempts to dispatch this operation to the document API, and returns whether this completed or not.
* This return {@code} true if dispatch was successful, or if it failed fatally; or {@code false} if
* dispatch should be retried at a later time.
*/
- boolean dispatch();
+ boolean dispatch() {
+ if ( ! lock.tryLock())
+ throw new IllegalStateException("Comcurrent attempts at dispatch — this is a bug");
- /** Wraps the operation parser in an Operation that is parsed the first time it is attempted dispatched. */
- static Operation lazilyParsed(HttpRequest request, ResponseHandler handler, Supplier<Operation> parser) {
- AtomicReference<Operation> operation = new AtomicReference<>();
- return () -> {
- try {
- return operation.updateAndGet(value -> value != null ? value : parser.get()).dispatch();
- }
- catch (IllegalArgumentException e) {
- badRequest(request, e, handler);
- }
- catch (RuntimeException e) {
- serverError(request, e, handler);
- }
- return true;
- };
+ try {
+ if (operation == null)
+ operation = parse();
+
+ return operation.get();
+ }
+ catch (IllegalArgumentException e) {
+ badRequest(request, e, handler);
+ }
+ catch (RuntimeException e) {
+ serverError(request, e, handler);
+ }
+ finally {
+ lock.unlock();
+ }
+ return true;
}
+ abstract Supplier<Boolean> parse();
+
}
/** Attempts to send the given document operation, returning false if thes needs to be retried. */
- private static boolean dispatchOperation(HttpRequest request, ResponseHandler handler, Supplier<Result> documentOperation) {
- if (request.isCancelled())
- return true;
-
+ private static boolean dispatchOperation(Supplier<Result> documentOperation) {
Result result = documentOperation.get();
if (result.type() == Result.ResultType.TRANSIENT_ERROR)
return false;
if (result.type() == Result.ResultType.FATAL_ERROR)
- serverError(request, result.getError(), handler);
+ throw new RuntimeException(result.getError());
return true;
}
@@ -641,7 +654,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final ReadableContentChannel delegate = new ReadableContentChannel();
private final Consumer<InputStream> reader;
- private final AtomicBoolean written = new AtomicBoolean();
public ForwardingContentChannel(Consumer<InputStream> reader) {
this.reader = reader;
@@ -652,11 +664,9 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
public void write(ByteBuffer buf, CompletionHandler handler) {
try {
delegate.write(buf, logException);
- written.set(true);
handler.completed();
}
catch (Exception e) {
- log.log(WARNING, "Failed writing request data", e);
handler.failed(e);
}
}
@@ -665,8 +675,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void close(CompletionHandler handler) {
try {
- if ( ! written.get())
- throw new IllegalStateException("This content channel expects content to be written prior to close");
delegate.close(logException);
try (UnsafeContentInputStream in = new UnsafeContentInputStream(delegate)) {
reader.accept(in);
@@ -674,7 +682,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
handler.completed();
}
catch (Exception e) {
- log.log(WARNING, "Failed closing request data", e);
handler.failed(e);
}
}
@@ -834,16 +841,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
latch.countDown();
}
- catch (IllegalArgumentException e) {
- badRequest(request, e, handler);
- }
catch (ParseException e) {
badRequest(request, new IllegalArgumentException(e), handler);
}
- catch (RuntimeException e) {
- serverError(request, e, handler);
- }
- catch (Exception e) {
+ catch (IOException e) {
log.log(FINE, "Failed writing response", e);
}
}
diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
index 74f49367e88..6516b983044 100644
--- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
+++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java
@@ -229,6 +229,7 @@ public class DocumentV1ApiTest {
// GET with namespace and document type is a restricted visit.
access.expect(parameters -> {
assertEquals("(music) and (id.namespace=='space')", parameters.getDocumentSelection());
+ assertEquals(new ProgressToken().serializeToString(), parameters.getResumeToken().serializeToString());
throw new IllegalArgumentException("parse failure");
});
response = driver.sendRequest("http://localhost/document/v1/space/music/docid?continuation=" + new ProgressToken().serializeToString());