From d0572215be00dd764281818c9f370b7f82adb8bf Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 25 Sep 2020 14:01:41 +0200 Subject: Make LocalAsyncSession async when asked to, and test this --- .../yahoo/documentapi/local/LocalAsyncSession.java | 131 +++++++++++++-------- .../documentapi/local/LocalDocumentAccess.java | 6 +- .../local/LocalDocumentApiTestCase.java | 66 +++++++---- 3 files changed, 131 insertions(+), 72 deletions(-) (limited to 'documentapi') diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index 40f26a82a89..b0ecf1b23b0 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -20,29 +20,32 @@ import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.UpdateResponse; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import java.util.LinkedList; -import java.util.List; -import java.util.Random; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static com.yahoo.documentapi.Result.ResultType.SUCCESS; /** * @author bratseth + * @author jonmv */ public class LocalAsyncSession implements AsyncSession { - private final List responses = new LinkedList<>(); + private final BlockingQueue responses = new LinkedBlockingQueue<>(); private final ResponseHandler handler; private final SyncSession syncSession; - private long requestId = 0; - private Random random = new Random(); - private synchronized long getNextRequestId() { - requestId++; - return requestId; - } + private AtomicLong requestId = new AtomicLong(0); + private AtomicReference synchronizer = new AtomicReference<>(); + private AtomicReference result = new AtomicReference<>(SUCCESS); public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); - random.setSeed(System.currentTimeMillis()); syncSession = access.createSyncSession(new SyncParameters.Builder().build()); } @@ -58,14 +61,15 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - syncSession.put(documentPut, pri); - addResponse(new DocumentResponse(req, documentPut.getDocument())); - } catch (Exception e) { - addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + return send(req -> { + try { + syncSession.put(documentPut, pri); + return new DocumentResponse(req, documentPut.getDocument()); + } + catch (Exception e) { + return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); + } + }); } @Override @@ -81,13 +85,14 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result get(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - try { - addResponse(new DocumentResponse(req, syncSession.get(id))); - } catch (Exception e) { - addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR)); - } - return new Result(req); + return send(req -> { + try { + return new DocumentResponse(req, syncSession.get(id)); + } + catch (Exception e) { + return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); + } + }); } @Override @@ -97,13 +102,14 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result remove(DocumentId id, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.remove(new DocumentRemove(id), pri)) { - addResponse(new RemoveResponse(req, true)); - } else { - addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + return send(req -> { + if (syncSession.remove(new DocumentRemove(id), pri)) { + return new RemoveResponse(req, true); + } + else { + return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); + } + }); } @Override @@ -113,27 +119,24 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { - long req = getNextRequestId(); - if (syncSession.update(update, pri)) { - addResponse(new UpdateResponse(req, true)); - } else { - addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND)); - } - return new Result(req); + return send(req -> { + if (syncSession.update(update, pri)) { + return new UpdateResponse(req, true); + } + else { + return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); + } + }); } @Override public Response getNext() { - if (responses.isEmpty()) { - return null; - } - int index = random.nextInt(responses.size()); - return responses.remove(index); + return responses.poll(); } @Override - public Response getNext(int timeout) { - return getNext(); + public Response getNext(int timeoutMilliseconds) throws InterruptedException { + return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS); } @Override @@ -141,6 +144,20 @@ public class LocalAsyncSession implements AsyncSession { // empty } + /** + * This is run in a separate thread before and after providing the response from each accepted request, for advanced testing. + * + * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. + */ + public void setSynchronizer(Runnable synchronizer) { + this.synchronizer.set(synchronizer); + } + + /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ + public void setResultType(Result.ResultType resultType) { + this.result.set(resultType); + } + private void addResponse(Response response) { if (handler != null) { handler.handleResponse(response); @@ -149,4 +166,24 @@ public class LocalAsyncSession implements AsyncSession { } } + private Result send(Function responses) { + Result.ResultType resultType = result.get(); + if (resultType != SUCCESS) + return new Result(resultType, new Error()); + + long req = requestId.incrementAndGet(); + synchronizer.getAndUpdate(runnable -> { + if (runnable == null) + addResponse(responses.apply(req)); + else + new Thread(() -> { + runnable.run(); + addResponse(responses.apply(req)); + runnable.run(); + }).start(); + return runnable; + }); + return new Result(req); + } + } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index c69a8fb48de..e24853b9294 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess { } @Override - public SyncSession createSyncSession(SyncParameters parameters) { + public LocalSyncSession createSyncSession(SyncParameters parameters) { return new LocalSyncSession(this); } @Override - public AsyncSession createAsyncSession(AsyncParameters parameters) { + public LocalAsyncSession createAsyncSession(AsyncParameters parameters) { return new LocalAsyncSession(parameters, this); } @Override - public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { + public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { return new LocalVisitorSession(this, parameters); } diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 69dc7c6da74..ced7a0c352e 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -36,6 +36,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -85,17 +91,24 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { } @Test - public void testAsyncFetch() { - AsyncSession session = access.createAsyncSession(new AsyncParameters()); + public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException { + LocalAsyncSession session = access.createAsyncSession(new AsyncParameters()); List ids = new ArrayList<>(); ids.add(new DocumentId("id:music:music::1")); ids.add(new DocumentId("id:music:music::2")); ids.add(new DocumentId("id:music:music::3")); for (DocumentId id : ids) session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - int timeout = 100; + + // Let all async operations wait for a signal from the test thread before and after each response. + Phaser phaser = new Phaser(1); + session.setSynchronizer(() -> { + phaser.register(); + phaser.awaitAdvance(phaser.arriveAndDeregister()); + }); long startTime = System.currentTimeMillis(); + int timeoutMillis = 1000; Set outstandingRequests = new HashSet<>(); for (DocumentId id : ids) { Result result = session.get(id); @@ -104,27 +117,36 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { outstandingRequests.add(result.getRequestId()); } - List documents = new ArrayList<>(); - try { - while ( ! outstandingRequests.isEmpty()) { - int timeSinceStart = (int)(System.currentTimeMillis() - startTime); - Response response = session.getNext(timeout - timeSinceStart); - if (response == null) - throw new RuntimeException("Timed out waiting for documents"); // or return what you have - if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore - - if (response.isSuccess()) - documents.add(((DocumentResponse)response).getDocument()); - outstandingRequests.remove(response.getRequestId()); + // Wait for responses in separate thread. + Future futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> { + try { + List documents = new ArrayList<>(); + while (!outstandingRequests.isEmpty()) { + int timeSinceStart = (int) (System.currentTimeMillis() - startTime); + Response response = session.getNext(timeoutMillis - timeSinceStart); + if (response == null) + throw new RuntimeException("Timed out waiting for documents"); // or return what you have + if (!outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore + + if (response.isSuccess()) + documents.add(((DocumentResponse) response).getDocument()); + outstandingRequests.remove(response.getRequestId()); + } + assertEquals(3, documents.size()); + for (Document document : documents) + assertNotNull(document); } - } - catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting for documents", e); - } + catch (InterruptedException e) { + throw new IllegalArgumentException("Interrupted while waiting for responses"); + } + }); + + // All operations, and receiver, now waiting for this thread to arrive. + assertEquals(1, phaser.getRegisteredParties()); + assertEquals(0, phaser.getPhase()); + phaser.awaitAdvance(phaser.arriveAndDeregister()); // Deregister so threads can finish without waiting after response. - assertEquals(3, documents.size()); - for (Document document : documents) - assertNotNull(document); + futureWithAssertions.get(1000, TimeUnit.MILLISECONDS); } @Test -- cgit v1.2.3