diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-10-26 10:29:11 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-10-26 10:29:11 +0200 |
commit | 0a632b0a29de4d5117293dfaf48c3aac89fab674 (patch) | |
tree | 549ec722f0cb30b232973a89cdb75ca769154da0 /vespaclient-container-plugin | |
parent | 63fee85aa2a45f1550a0cfe445e5964ccfca40e4 (diff) |
onDone is only called normally _after_ last ack is run, so simplify
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 82 |
1 files changed, 35 insertions, 47 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 92e2c95f704..0303c36cb4d 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -1133,16 +1133,6 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } @Override public void onEnd(JsonResponse response) throws IOException { - // Wait for other writers to complete, then write what remains here. - while ( ! writing.compareAndSet(false, true)) { - try { - Thread.sleep(1); - } - catch (InterruptedException e) { - log.log(WARNING, "Interrupted waiting for visited documents to be written; this should not happen"); - Thread.currentThread().interrupt(); - } - } for (Runnable write; (write = writes.poll()) != null; write.run()); response.writeArrayEnd(); } @@ -1161,45 +1151,43 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { callback.onStart(response); VisitorControlHandler controller = new VisitorControlHandler() { @Override public void onDone(CompletionCode code, String message) { - defaultExecutor.execute(() -> { - super.onDone(code, message); - loggingException(() -> { - try (response) { - callback.onEnd(response); - - if (getVisitorStatistics() != null) - response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); - - int status = Response.Status.BAD_GATEWAY; - switch (code) { - case TIMEOUT: - if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { - response.writeMessage("No buckets visited within timeout of " + - parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); - status = Response.Status.GATEWAY_TIMEOUT; - break; - } - // TODO jonmv: always supply and document continuation? - case SUCCESS: // Intentional fallthrough. - case ABORTED: // Intentional fallthrough. - if (error.get() == null) { - ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); - if (progress != null && ! progress.isFinished()) - response.writeContinuation(progress.serializeToString()); - - status = Response.Status.OK; - break; - } - default: - response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); - } - if ( ! streaming) - response.commit(status); + super.onDone(code, message); + loggingException(() -> { + try (response) { + callback.onEnd(response); + + if (getVisitorStatistics() != null) + response.writeDocumentCount(getVisitorStatistics().getDocumentsVisited()); + + int status = Response.Status.BAD_GATEWAY; + switch (code) { + case TIMEOUT: + if ( ! hasVisitedAnyBuckets() && parameters.getVisitInconsistentBuckets()) { + response.writeMessage("No buckets visited within timeout of " + + parameters.getSessionTimeoutMs() + "ms (request timeout -5s)"); + status = Response.Status.GATEWAY_TIMEOUT; + break; + } + // TODO jonmv: always supply and document continuation? + case SUCCESS: // Intentional fallthrough. + case ABORTED: // Intentional fallthrough. + if (error.get() == null) { + ProgressToken progress = getProgress() != null ? getProgress() : parameters.getResumeToken(); + if (progress != null && ! progress.isFinished()) + response.writeContinuation(progress.serializeToString()); + + status = Response.Status.OK; + break; + } + default: + response.writeMessage(error.get() != null ? error.get() : message != null ? message : "Visiting failed"); } - }); - phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. - visits.remove(this).destroy(); + if ( ! streaming) + response.commit(status); + } }); + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. + visits.remove(this).destroy(); } }; if (parameters.getRemoteDataHandler() == null) { |