summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-10-26 10:29:11 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-10-26 10:29:11 +0200
commit0a632b0a29de4d5117293dfaf48c3aac89fab674 (patch)
tree549ec722f0cb30b232973a89cdb75ca769154da0 /vespaclient-container-plugin
parent63fee85aa2a45f1550a0cfe445e5964ccfca40e4 (diff)
onDone is only called normally _after_ last ack is run, so simplify
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java82
1 files changed, 35 insertions, 47 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 92e2c95f704..0303c36cb4d 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -1133,16 +1133,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
@Override public void onEnd(JsonResponse response) throws IOException {
- // Wait for other writers to complete, then write what remains here.
- while ( ! writing.compareAndSet(false, true)) {
- try {
- Thread.sleep(1);
- }
- catch (InterruptedException e) {
- log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen");
- Thread.currentThread().interrupt();
- }
- }
for (Runnable write; (write = writes.poll()) != null; write.run());
response.writeArrayEnd();
}
@@ -1161,45 +1151,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
callback.onStart(response);
VisitorControlHandler controller = new VisitorControlHandler() {
@Override public void onDone(CompletionCode code, String message) {
- defaultExecutor.execute(() -> {
- super.onDone(code, message);
- loggingException(() -> {
- try (response) {
- callback.onEnd(response);
-
- if (getVisitorStatistics() != null)
- response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
-
- int status = Response.Status.BAD_GATEWAY;
- switch (code) {
- case TIMEOUT:
- if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
- response.writeMessage("No buckets visited within timeout of " +
- parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
- status = Response.Status.GATEWAY_TIMEOUT;
- break;
- }
- // TODO jonmv: always supply and document continuation?
- case SUCCESS: // Intentional fallthrough.
- case ABORTED: // Intentional fallthrough.
- if (error.get() == null) {
- ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
- if (progress != null && ! progress.isFinished())
- response.writeContinuation(progress.serializeToString());
-
- status = Response.Status.OK;
- break;
- }
- default:
- response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
- }
- if ( ! streaming)
- response.commit(status);
+ super.onDone(code, message);
+ loggingException(() -> {
+ try (response) {
+ callback.onEnd(response);
+
+ if (getVisitorStatistics() != null)
+ response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited());
+
+ int status = Response.Status.BAD_GATEWAY;
+ switch (code) {
+ case TIMEOUT:
+ if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) {
+ response.writeMessage("No buckets visited within timeout of " +
+ parameters.getSessionTimeoutMs() + "ms (request timeout -5s)");
+ status = Response.Status.GATEWAY_TIMEOUT;
+ break;
+ }
+ // TODO jonmv: always supply and document continuation?
+ case SUCCESS: // Intentional fallthrough.
+ case ABORTED: // Intentional fallthrough.
+ if (error.get() == null) {
+ ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken();
+ if (progress != null && ! progress.isFinished())
+ response.writeContinuation(progress.serializeToString());
+
+ status = Response.Status.OK;
+ break;
+ }
+ default:
+ response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed");
}
- });
- phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
- visits.remove(this).destroy();
+ if ( ! streaming)
+ response.commit(status);
+ }
});
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
+ visits.remove(this).destroy();
}
};
if (parameters.getRemoteDataHandler() == null) {