diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-09-29 08:39:35 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-09-30 10:23:34 +0200 |
commit | 11405e52f2853e44df0944bf7bbee13dc2e617a5 (patch) | |
tree | 572e253df6f8c090072da3b211b6973b75f3ef37 /documentapi/src | |
parent | bc54f2ad34e2e4737a4de326035fdf00d5729da1 (diff) |
Revert "Revert "Jonmv/async doc v1 implementation""
This reverts commit c6aded1606112a54969f56403085ca90d61dac8f.
Diffstat (limited to 'documentapi/src')
4 files changed, 142 insertions, 73 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 40f26a82a89..ff3eeb02a71 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -20,29 +20,36 @@ import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.UpdateResponse; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static com.yahoo.documentapi.Result.ResultType.SUCCESS; /** * @author bratseth + * @author jonmv */ public class LocalAsyncSession implements AsyncSession { - private final List<Response> responses = new LinkedList<>(); + private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); private final ResponseHandler handler; private final SyncSession syncSession; - private long requestId = 0; - private Random random = new Random(); + private final Executor executor = Executors.newCachedThreadPool(); - private synchronized long getNextRequestId() { - requestId++; - return requestId; - } + private AtomicLong requestId = new AtomicLong(0); + private AtomicReference<Phaser> phaser = new AtomicReference<>(); + private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); - random.setSeed(System.currentTimeMillis()); syncSession = access.createSyncSession(new SyncParameters.Builder().build()); } @@ -58,14 +65,15 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - syncSession.put(documentPut, pri); - addResponse(new DocumentResponse(req, documentPut.getDocument())); - } catch (Exception e) { - addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + return send(req -> { + try { + syncSession.put(documentPut, pri); + return new DocumentResponse(req, documentPut.getDocument()); + } + catch (Exception e) { + return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); + } + }); } @Override @@ -81,13 +89,14 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result get(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - addResponse(new DocumentResponse(req, syncSession.get(id))); - } catch (Exception e) { - addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + return send(req -> { + try { + return new DocumentResponse(req, syncSession.get(id)); + } + catch (Exception e) { + return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); + } + }); } @Override @@ -97,13 +106,14 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result remove(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.remove(new DocumentRemove(id), pri)) { - addResponse(new RemoveResponse(req, true)); - } else { - addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + return send(req -> { + if (syncSession.remove(new DocumentRemove(id), pri)) { + return new RemoveResponse(req, true); + } + else { + return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); + } + }); } @Override @@ -113,27 +123,24 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.update(update, pri)) { - addResponse(new UpdateResponse(req, true)); - } else { - addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + return send(req -> { + if (syncSession.update(update, pri)) { + return new UpdateResponse(req, true); + } + else { + return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); + } + }); } @Override public Response getNext() { - if (responses.isEmpty()) { - return null; - } - int index = random.nextInt(responses.size()); - return responses.remove(index); + return responses.poll(); } @Override - public Response getNext(int timeout) { - return getNext(); + public Response getNext(int timeoutMilliseconds) throws InterruptedException { + return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS); } @Override @@ -141,6 +148,22 @@ public class LocalAsyncSession implements AsyncSession { // empty } + /** + * When this is set, every operation is sent in a separate thread, which first registers with the given phaser, + * and then arrives and awaits advance so the user can trigger responses. After the response is delivered, + * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered. + * + * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. + */ + public void setPhaser(Phaser phaser) { + this.phaser.set(phaser); + } + + /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ + public void setResultType(Result.ResultType resultType) { + this.result.set(resultType); + } + private void addResponse(Response response) { if (handler != null) { handler.handleResponse(response); @@ -149,4 +172,23 @@ public class LocalAsyncSession implements AsyncSession { } } + private Result send(Function<Long, Response> responses) { + Result.ResultType resultType = result.get(); + if (resultType != SUCCESS) + return new Result(resultType, new Error()); + + long req = requestId.incrementAndGet(); + Phaser synchronizer = phaser.get(); + if (synchronizer == null) + addResponse(responses.apply(req)); + else + executor.execute(() -> { + synchronizer.register(); + synchronizer.arriveAndAwaitAdvance(); + addResponse(responses.apply(req)); + synchronizer.arriveAndDeregister(); + }); + return new Result(req); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index c69a8fb48de..e24853b9294 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess { } @Override - public SyncSession createSyncSession(SyncParameters parameters) { + public LocalSyncSession createSyncSession(SyncParameters parameters) { return new LocalSyncSession(this); } @Override - public AsyncSession createAsyncSession(AsyncParameters parameters) { + public LocalAsyncSession createAsyncSession(AsyncParameters parameters) { return new LocalAsyncSession(parameters, this); } @Override - public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { + public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { return new LocalVisitorSession(this, parameters); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index f087b646ca4..85be1c11fcd 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -76,8 +76,13 @@ public class LocalVisitorSession implements VisitorSession { if (state.get() != State.RUNNING) return; - if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + try { + if (selector.accepts(new DocumentPut(document)) != Result.TRUE) + return; + } + catch (RuntimeException e) { return; + } Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 69dc7c6da74..33cae60ab93 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -36,6 +36,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -85,17 +91,22 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { } @Test - public void testAsyncFetch() { - AsyncSession session = access.createAsyncSession(new AsyncParameters()); + public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException { + LocalAsyncSession session = access.createAsyncSession(new AsyncParameters()); List<DocumentId> ids = new ArrayList<>(); ids.add(new DocumentId("id:music:music::1")); ids.add(new DocumentId("id:music:music::2")); ids.add(new DocumentId("id:music:music::3")); for (DocumentId id : ids) session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - int timeout = 100; + + // Let all async operations wait for a signal from the test thread before sending their responses, and let test + // thread wait for all responses to be delivered afterwards. + Phaser phaser = new Phaser(1); + session.setPhaser(phaser); long startTime = System.currentTimeMillis(); + int timeoutMillis = 1000; Set<Long> outstandingRequests = new HashSet<>(); for (DocumentId id : ids) { Result result = session.get(id); @@ -104,27 +115,38 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { outstandingRequests.add(result.getRequestId()); } - List<Document> documents = new ArrayList<>(); - try { - while ( ! outstandingRequests.isEmpty()) { - int timeSinceStart = (int)(System.currentTimeMillis() - startTime); - Response response = session.getNext(timeout - timeSinceStart); - if (response == null) - throw new RuntimeException("Timed out waiting for documents"); // or return what you have - if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore - - if (response.isSuccess()) - documents.add(((DocumentResponse)response).getDocument()); - outstandingRequests.remove(response.getRequestId()); + // Wait for responses in separate thread. + Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> { + try { + List<Document> documents = new ArrayList<>(); + while ( ! outstandingRequests.isEmpty()) { + int timeSinceStart = (int) (System.currentTimeMillis() - startTime); + Response response = session.getNext(timeoutMillis - timeSinceStart); + if (response == null) + throw new RuntimeException("Timed out waiting for documents"); // or return what you have + if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore + + if (response.isSuccess()) + documents.add(((DocumentResponse) response).getDocument()); + outstandingRequests.remove(response.getRequestId()); + } + assertEquals(3, documents.size()); + for (Document document : documents) + assertNotNull(document); } - } - catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for documents", e); - } + catch (InterruptedException e) { + throw new IllegalArgumentException("Interrupted while waiting for responses"); + } + }); + + // All operations, and receiver, now waiting for this thread to arrive. + assertEquals(4, phaser.getRegisteredParties()); + assertEquals(0, phaser.getPhase()); + phaser.arrive(); + assertEquals(1, phaser.getPhase()); + phaser.awaitAdvance(phaser.arriveAndDeregister()); - assertEquals(3, documents.size()); - for (Document document : documents) - assertNotNull(document); + futureWithAssertions.get(1000, TimeUnit.MILLISECONDS); } @Test |