diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-09-29 08:10:05 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-29 08:10:05 +0200 |
commit | c6aded1606112a54969f56403085ca90d61dac8f (patch) | |
tree | db29615090e57241998ec0deb1c55a49632c3623 /documentapi | |
parent | 09bf1d5f22a7ae98191c94e9be591994b5125557 (diff) |
Revert "Jonmv/async doc v1 implementation"
Diffstat (limited to 'documentapi')
5 files changed, 78 insertions, 152 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json index c8cbc978a8f..f5f2a7c1845 100644 --- a/documentapi/abi-spec.json +++ b/documentapi/abi-spec.json @@ -979,9 +979,7 @@ "public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)", "public com.yahoo.documentapi.Response getNext()", "public com.yahoo.documentapi.Response getNext(int)", - "public void destroy()", - "public void setPhaser(java.util.concurrent.Phaser)", - "public void setResultType(com.yahoo.documentapi.Result$ResultType)" + "public void destroy()" ], "fields": [] }, @@ -993,15 +991,12 @@ ], "methods": [ "public void <init>(com.yahoo.documentapi.DocumentAccessParams)", - "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)", - "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", - "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", + "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)", + "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", + "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", "public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)", "public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)", - "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)", - "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)", - "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)", - "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)" + "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)" ], "fields": [] }, diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java index ff3eeb02a71..40f26a82a89 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java @@ -20,36 +20,29 @@ import com.yahoo.documentapi.SyncSession; import com.yahoo.documentapi.UpdateResponse; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; - -import static com.yahoo.documentapi.Result.ResultType.SUCCESS; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; /** * @author bratseth - * @author jonmv */ public class LocalAsyncSession implements AsyncSession { - private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>(); + private final List<Response> responses = new LinkedList<>(); private final ResponseHandler handler; private final SyncSession syncSession; - private final Executor executor = Executors.newCachedThreadPool(); + private long requestId = 0; + private Random random = new Random(); - private AtomicLong requestId = new AtomicLong(0); - private AtomicReference<Phaser> phaser = new AtomicReference<>(); - private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS); + private synchronized long getNextRequestId() { + requestId++; + return requestId; + } public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) { this.handler = params.getResponseHandler(); + random.setSeed(System.currentTimeMillis()); syncSession = access.createSyncSession(new SyncParameters.Builder().build()); } @@ -65,15 +58,14 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) { - return send(req -> { - try { - syncSession.put(documentPut, pri); - return new DocumentResponse(req, documentPut.getDocument()); - } - catch (Exception e) { - return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR); - } - }); + long req = getNextRequestId(); + try { + syncSession.put(documentPut, pri); + addResponse(new DocumentResponse(req, documentPut.getDocument())); + } catch (Exception e) { + addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR)); + } + return new Result(req); } @Override @@ -89,14 +81,13 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result get(DocumentId id, DocumentProtocol.Priority pri) { - return send(req -> { - try { - return new DocumentResponse(req, syncSession.get(id)); - } - catch (Exception e) { - return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR); - } - }); + long req = getNextRequestId(); + try { + addResponse(new DocumentResponse(req, syncSession.get(id))); + } catch (Exception e) { + addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR)); + } + return new Result(req); } @Override @@ -106,14 +97,13 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result remove(DocumentId id, DocumentProtocol.Priority pri) { - return send(req -> { - if (syncSession.remove(new DocumentRemove(id), pri)) { - return new RemoveResponse(req, true); - } - else { - return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND); - } - }); + long req = getNextRequestId(); + if (syncSession.remove(new DocumentRemove(id), pri)) { + addResponse(new RemoveResponse(req, true)); + } else { + addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND)); + } + return new Result(req); } @Override @@ -123,24 +113,27 @@ public class LocalAsyncSession implements AsyncSession { @Override public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) { - return send(req -> { - if (syncSession.update(update, pri)) { - return new UpdateResponse(req, true); - } - else { - return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND); - } - }); + long req = getNextRequestId(); + if (syncSession.update(update, pri)) { + addResponse(new UpdateResponse(req, true)); + } else { + addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND)); + } + return new Result(req); } @Override public Response getNext() { - return responses.poll(); + if (responses.isEmpty()) { + return null; + } + int index = random.nextInt(responses.size()); + return responses.remove(index); } @Override - public Response getNext(int timeoutMilliseconds) throws InterruptedException { - return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS); + public Response getNext(int timeout) { + return getNext(); } @Override @@ -148,22 +141,6 @@ public class LocalAsyncSession implements AsyncSession { // empty } - /** - * When this is set, every operation is sent in a separate thread, which first registers with the given phaser, - * and then arrives and awaits advance so the user can trigger responses. After the response is delivered, - * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered. - * - * If this is not set, which is the default, the documents appear synchronously in the response queue or handler. - */ - public void setPhaser(Phaser phaser) { - this.phaser.set(phaser); - } - - /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */ - public void setResultType(Result.ResultType resultType) { - this.result.set(resultType); - } - private void addResponse(Response response) { if (handler != null) { handler.handleResponse(response); @@ -172,23 +149,4 @@ public class LocalAsyncSession implements AsyncSession { } } - private Result send(Function<Long, Response> responses) { - Result.ResultType resultType = result.get(); - if (resultType != SUCCESS) - return new Result(resultType, new Error()); - - long req = requestId.incrementAndGet(); - Phaser synchronizer = phaser.get(); - if (synchronizer == null) - addResponse(responses.apply(req)); - else - executor.execute(() -> { - synchronizer.register(); - synchronizer.arriveAndAwaitAdvance(); - addResponse(responses.apply(req)); - synchronizer.arriveAndDeregister(); - }); - return new Result(req); - } - } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java index e24853b9294..c69a8fb48de 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java @@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess { } @Override - public LocalSyncSession createSyncSession(SyncParameters parameters) { + public SyncSession createSyncSession(SyncParameters parameters) { return new LocalSyncSession(this); } @Override - public LocalAsyncSession createAsyncSession(AsyncParameters parameters) { + public AsyncSession createAsyncSession(AsyncParameters parameters) { return new LocalAsyncSession(parameters, this); } @Override - public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { + public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException { return new LocalVisitorSession(this, parameters); } diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java index 85be1c11fcd..f087b646ca4 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java @@ -76,13 +76,8 @@ public class LocalVisitorSession implements VisitorSession { if (state.get() != State.RUNNING) return; - try { - if (selector.accepts(new DocumentPut(document)) != Result.TRUE) - return; - } - catch (RuntimeException e) { + if (selector.accepts(new DocumentPut(document)) != Result.TRUE) return; - } Document copy = new Document(document.getDataType(), document.getId()); new FieldSetRepo().copyFields(document, copy, fieldSet); diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java index 33cae60ab93..69dc7c6da74 100644 --- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java @@ -36,12 +36,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -91,22 +85,17 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { } @Test - public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException { - LocalAsyncSession session = access.createAsyncSession(new AsyncParameters()); + public void testAsyncFetch() { + AsyncSession session = access.createAsyncSession(new AsyncParameters()); List<DocumentId> ids = new ArrayList<>(); ids.add(new DocumentId("id:music:music::1")); ids.add(new DocumentId("id:music:music::2")); ids.add(new DocumentId("id:music:music::3")); for (DocumentId id : ids) session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id)); - - // Let all async operations wait for a signal from the test thread before sending their responses, and let test - // thread wait for all responses to be delivered afterwards. - Phaser phaser = new Phaser(1); - session.setPhaser(phaser); + int timeout = 100; long startTime = System.currentTimeMillis(); - int timeoutMillis = 1000; Set<Long> outstandingRequests = new HashSet<>(); for (DocumentId id : ids) { Result result = session.get(id); @@ -115,38 +104,27 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase { outstandingRequests.add(result.getRequestId()); } - // Wait for responses in separate thread. - Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> { - try { - List<Document> documents = new ArrayList<>(); - while ( ! outstandingRequests.isEmpty()) { - int timeSinceStart = (int) (System.currentTimeMillis() - startTime); - Response response = session.getNext(timeoutMillis - timeSinceStart); - if (response == null) - throw new RuntimeException("Timed out waiting for documents"); // or return what you have - if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore - - if (response.isSuccess()) - documents.add(((DocumentResponse) response).getDocument()); - outstandingRequests.remove(response.getRequestId()); - } - assertEquals(3, documents.size()); - for (Document document : documents) - assertNotNull(document); - } - catch (InterruptedException e) { - throw new IllegalArgumentException("Interrupted while waiting for responses"); + List<Document> documents = new ArrayList<>(); + try { + while ( ! outstandingRequests.isEmpty()) { + int timeSinceStart = (int)(System.currentTimeMillis() - startTime); + Response response = session.getNext(timeout - timeSinceStart); + if (response == null) + throw new RuntimeException("Timed out waiting for documents"); // or return what you have + if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore + + if (response.isSuccess()) + documents.add(((DocumentResponse)response).getDocument()); + outstandingRequests.remove(response.getRequestId()); } - }); - - // All operations, and receiver, now waiting for this thread to arrive. - assertEquals(4, phaser.getRegisteredParties()); - assertEquals(0, phaser.getPhase()); - phaser.arrive(); - assertEquals(1, phaser.getPhase()); - phaser.awaitAdvance(phaser.arriveAndDeregister()); + } + catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for documents", e); + } - futureWithAssertions.get(1000, TimeUnit.MILLISECONDS); + assertEquals(3, documents.size()); + for (Document document : documents) + assertNotNull(document); } @Test |