diff options
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r-- | vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java | 37 |
1 files changed, 17 insertions, 20 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index 9fd97f0c9f3..894b855c86c 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -60,6 +60,7 @@ import com.yahoo.search.query.ParameterParser; import com.yahoo.text.Text; import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig; import com.yahoo.yolean.Exceptions; +import com.yahoo.yolean.Exceptions.RunnableThrowingIOException; import java.io.IOException; import java.io.InputStream; @@ -79,8 +80,8 @@ import java.util.Optional; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -157,7 +158,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { private final Deque<Operation> operations; private final AtomicLong enqueued = new AtomicLong(); private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>(); - private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); + private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-")); private final Map<String, Map<Method, Handler>> handlers = defineApi(); @Inject @@ -184,10 +185,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { this.asyncSession = access.createAsyncSession(new AsyncParameters()); this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig); this.operations = new ConcurrentLinkedDeque<>(); - this.executor.scheduleWithFixedDelay(this::dispatchEnqueued, - executorConfig.resendDelayMillis(), - executorConfig.resendDelayMillis(), - TimeUnit.MILLISECONDS); + this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued, + executorConfig.resendDelayMillis(), + executorConfig.resendDelayMillis(), + TimeUnit.MILLISECONDS); } // ------------------------------------------------ Requests ------------------------------------------------- @@ -236,7 +237,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { @Override public void destroy() { - executor.shutdown(); + dispatcher.shutdown(); Instant doom = clock.instant().plus(Duration.ofSeconds(20)); while ( ! operations.isEmpty() && clock.instant().isBefore(doom)) dispatchEnqueued(); @@ -248,8 +249,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { visits.values().forEach(VisitorSession::destroy); try { - if ( ! executor.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) - executor.shutdownNow(); + if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS)) + dispatcher.shutdownNow(); } catch (InterruptedException e) { log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down"); @@ -548,7 +549,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { json.writeArrayFieldStart("documents"); } - synchronized void writeDocumentValue(Document document) throws IOException { + synchronized void writeDocumentValue(Document document) { new JsonWriter(json).write(document); } @@ -619,7 +620,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { }); } - private static void loggingException(Exceptions.RunnableThrowingIOException runnable) { + private static void loggingException(RunnableThrowingIOException runnable) { try { runnable.run(); } @@ -824,6 +825,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { path.documentType(), List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()), getProperty(request, BUCKET_SPACE))); + return parameters; } @@ -831,7 +833,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { try { JsonResponse response = JsonResponse.create(request, handler); response.writeDocumentsArrayStart(); - CountDownLatch latch = new CountDownLatch(1); + Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. parameters.setLocalDataHandler(new DumpVisitorDataHandler() { @Override public void onDocument(Document doc, long timeStamp) { loggingException(() -> { @@ -864,20 +866,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { response.writeMessage(message != null ? message : "Visiting failed"); response.respond(Response.Status.INTERNAL_SERVER_ERROR); } - executor.execute(() -> { - try { - latch.await(); // We may get here while dispatching thread is still putting us in the map. - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + dispatcher.execute(() -> { + phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map. visits.remove(this).destroy(); }); }); } }); visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters)); - latch.countDown(); + phaser.arriveAndDeregister(); } catch (ParseException e) { badRequest(request, new IllegalArgumentException(e), handler); |