summaryrefslogtreecommitdiffstats
path: root/vespaclient-container-plugin
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-02-02 13:58:47 +0100
committerJon Marius Venstad <venstad@gmail.com>2021-02-03 09:54:15 +0100
commitd99a6b2c40438450fd88d16d1e27eb4190c5b228 (patch)
tree54dbd1c2f9c4a48daf015702e486cf98063d51c9 /vespaclient-container-plugin
parentada7fdd880d0e8a8f64a689495dc27516803ca0a (diff)
Fire phasers!
Diffstat (limited to 'vespaclient-container-plugin')
-rw-r--r--vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java37
1 files changed, 17 insertions, 20 deletions
diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
index 9fd97f0c9f3..894b855c86c 100644
--- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
+++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java
@@ -60,6 +60,7 @@ import com.yahoo.search.query.ParameterParser;
import com.yahoo.text.Text;
import com.yahoo.vespa.config.content.AllClustersBucketSpacesConfig;
import com.yahoo.yolean.Exceptions;
+import com.yahoo.yolean.Exceptions.RunnableThrowingIOException;
import java.io.IOException;
import java.io.InputStream;
@@ -79,8 +80,8 @@ import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -157,7 +158,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
private final Deque<Operation> operations;
private final AtomicLong enqueued = new AtomicLong();
private final Map<VisitorControlHandler, VisitorSession> visits = new ConcurrentHashMap<>();
- private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
+ private final ScheduledExecutorService dispatcher = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("document-api-handler-"));
private final Map<String, Map<Method, Handler>> handlers = defineApi();
@Inject
@@ -184,10 +185,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
this.asyncSession = access.createAsyncSession(new AsyncParameters());
this.clusters = parseClusters(clusterListConfig, bucketSpacesConfig);
this.operations = new ConcurrentLinkedDeque<>();
- this.executor.scheduleWithFixedDelay(this::dispatchEnqueued,
- executorConfig.resendDelayMillis(),
- executorConfig.resendDelayMillis(),
- TimeUnit.MILLISECONDS);
+ this.dispatcher.scheduleWithFixedDelay(this::dispatchEnqueued,
+ executorConfig.resendDelayMillis(),
+ executorConfig.resendDelayMillis(),
+ TimeUnit.MILLISECONDS);
}
// ------------------------------------------------ Requests -------------------------------------------------
@@ -236,7 +237,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
@Override
public void destroy() {
- executor.shutdown();
+ dispatcher.shutdown();
Instant doom = clock.instant().plus(Duration.ofSeconds(20));
while ( ! operations.isEmpty() && clock.instant().isBefore(doom))
dispatchEnqueued();
@@ -248,8 +249,8 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
visits.values().forEach(VisitorSession::destroy);
try {
- if ( ! executor.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
- executor.shutdownNow();
+ if ( ! dispatcher.awaitTermination(Duration.between(clock.instant(), doom).toMillis(), TimeUnit.MILLISECONDS))
+ dispatcher.shutdownNow();
}
catch (InterruptedException e) {
log.log(WARNING, "Interrupted waiting for /document/v1 executor to shut down");
@@ -548,7 +549,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
json.writeArrayFieldStart("documents");
}
- synchronized void writeDocumentValue(Document document) throws IOException {
+ synchronized void writeDocumentValue(Document document) {
new JsonWriter(json).write(document);
}
@@ -619,7 +620,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
});
}
- private static void loggingException(Exceptions.RunnableThrowingIOException runnable) {
+ private static void loggingException(RunnableThrowingIOException runnable) {
try {
runnable.run();
}
@@ -824,6 +825,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
path.documentType(),
List.of(FixedBucketSpaces.defaultSpace(), FixedBucketSpaces.globalSpace()),
getProperty(request, BUCKET_SPACE)));
+
return parameters;
}
@@ -831,7 +833,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
try {
JsonResponse response = JsonResponse.create(request, handler);
response.writeDocumentsArrayStart();
- CountDownLatch latch = new CountDownLatch(1);
+ Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread.
parameters.setLocalDataHandler(new DumpVisitorDataHandler() {
@Override public void onDocument(Document doc, long timeStamp) {
loggingException(() -> {
@@ -864,20 +866,15 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler {
response.writeMessage(message != null ? message : "Visiting failed");
response.respond(Response.Status.INTERNAL_SERVER_ERROR);
}
- executor.execute(() -> {
- try {
- latch.await(); // We may get here while dispatching thread is still putting us in the map.
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ dispatcher.execute(() -> {
+ phaser.arriveAndAwaitAdvance(); // We may get here while dispatching thread is still putting us in the map.
visits.remove(this).destroy();
});
});
}
});
visits.put(parameters.getControlHandler(), access.createVisitorSession(parameters));
- latch.countDown();
+ phaser.arriveAndDeregister();
}
catch (ParseException e) {
badRequest(request, new IllegalArgumentException(e), handler);