aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-09-29 08:10:05 +0200
committerGitHub <noreply@github.com>2020-09-29 08:10:05 +0200
commitc6aded1606112a54969f56403085ca90d61dac8f (patch)
treedb29615090e57241998ec0deb1c55a49632c3623 /documentapi
parent09bf1d5f22a7ae98191c94e9be591994b5125557 (diff)
Revert "Jonmv/async doc v1 implementation"
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/abi-spec.json15
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java136
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java7
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
5 files changed, 78 insertions, 152 deletions
diff --git a/documentapi/abi-spec.json b/documentapi/abi-spec.json
index c8cbc978a8f..f5f2a7c1845 100644
--- a/documentapi/abi-spec.json
+++ b/documentapi/abi-spec.json
@@ -979,9 +979,7 @@
"public com.yahoo.documentapi.Result update(com.yahoo.document.DocumentUpdate, com.yahoo.documentapi.messagebus.protocol.DocumentProtocol$Priority)",
"public com.yahoo.documentapi.Response getNext()",
"public com.yahoo.documentapi.Response getNext(int)",
- "public void destroy()",
- "public void setPhaser(java.util.concurrent.Phaser)",
- "public void setResultType(com.yahoo.documentapi.Result$ResultType)"
+ "public void destroy()"
],
"fields": []
},
@@ -993,15 +991,12 @@
],
"methods": [
"public void <init>(com.yahoo.documentapi.DocumentAccessParams)",
- "public com.yahoo.documentapi.local.LocalSyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
- "public com.yahoo.documentapi.local.LocalAsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public com.yahoo.documentapi.local.LocalVisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
+ "public com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)",
+ "public com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
+ "public com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
"public com.yahoo.documentapi.VisitorDestinationSession createVisitorDestinationSession(com.yahoo.documentapi.VisitorDestinationParameters)",
"public com.yahoo.documentapi.SubscriptionSession createSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)",
- "public bridge synthetic com.yahoo.documentapi.VisitorSession createVisitorSession(com.yahoo.documentapi.VisitorParameters)",
- "public bridge synthetic com.yahoo.documentapi.AsyncSession createAsyncSession(com.yahoo.documentapi.AsyncParameters)",
- "public bridge synthetic com.yahoo.documentapi.SyncSession createSyncSession(com.yahoo.documentapi.SyncParameters)"
+ "public com.yahoo.documentapi.SubscriptionSession openSubscription(com.yahoo.documentapi.SubscriptionParameters)"
],
"fields": []
},
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index ff3eeb02a71..40f26a82a89 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -20,36 +20,29 @@ import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.Queue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-
-import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
/**
* @author bratseth
- * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
+ private final List<Response> responses = new LinkedList<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private final Executor executor = Executors.newCachedThreadPool();
+ private long requestId = 0;
+ private Random random = new Random();
- private AtomicLong requestId = new AtomicLong(0);
- private AtomicReference<Phaser> phaser = new AtomicReference<>();
- private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
+ private synchronized long getNextRequestId() {
+ requestId++;
+ return requestId;
+ }
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
+ random.setSeed(System.currentTimeMillis());
syncSession = access.createSyncSession(new SyncParameters.Builder().build());
}
@@ -65,15 +58,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- return send(req -> {
- try {
- syncSession.put(documentPut, pri);
- return new DocumentResponse(req, documentPut.getDocument());
- }
- catch (Exception e) {
- return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ long req = getNextRequestId();
+ try {
+ syncSession.put(documentPut, pri);
+ addResponse(new DocumentResponse(req, documentPut.getDocument()));
+ } catch (Exception e) {
+ addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
+ }
+ return new Result(req);
}
@Override
@@ -89,14 +81,13 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- return send(req -> {
- try {
- return new DocumentResponse(req, syncSession.get(id));
- }
- catch (Exception e) {
- return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
- }
- });
+ long req = getNextRequestId();
+ try {
+ addResponse(new DocumentResponse(req, syncSession.get(id)));
+ } catch (Exception e) {
+ addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
+ }
+ return new Result(req);
}
@Override
@@ -106,14 +97,13 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- return send(req -> {
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- return new RemoveResponse(req, true);
- }
- else {
- return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ long req = getNextRequestId();
+ if (syncSession.remove(new DocumentRemove(id), pri)) {
+ addResponse(new RemoveResponse(req, true));
+ } else {
+ addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
+ }
+ return new Result(req);
}
@Override
@@ -123,24 +113,27 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- return send(req -> {
- if (syncSession.update(update, pri)) {
- return new UpdateResponse(req, true);
- }
- else {
- return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
- }
- });
+ long req = getNextRequestId();
+ if (syncSession.update(update, pri)) {
+ addResponse(new UpdateResponse(req, true));
+ } else {
+ addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
+ }
+ return new Result(req);
}
@Override
public Response getNext() {
- return responses.poll();
+ if (responses.isEmpty()) {
+ return null;
+ }
+ int index = random.nextInt(responses.size());
+ return responses.remove(index);
}
@Override
- public Response getNext(int timeoutMilliseconds) throws InterruptedException {
- return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
+ public Response getNext(int timeout) {
+ return getNext();
}
@Override
@@ -148,22 +141,6 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
- /**
- * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
- * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
- * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
- *
- * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
- */
- public void setPhaser(Phaser phaser) {
- this.phaser.set(phaser);
- }
-
- /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
- public void setResultType(Result.ResultType resultType) {
- this.result.set(resultType);
- }
-
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -172,23 +149,4 @@ public class LocalAsyncSession implements AsyncSession {
}
}
- private Result send(Function<Long, Response> responses) {
- Result.ResultType resultType = result.get();
- if (resultType != SUCCESS)
- return new Result(resultType, new Error());
-
- long req = requestId.incrementAndGet();
- Phaser synchronizer = phaser.get();
- if (synchronizer == null)
- addResponse(responses.apply(req));
- else
- executor.execute(() -> {
- synchronizer.register();
- synchronizer.arriveAndAwaitAdvance();
- addResponse(responses.apply(req));
- synchronizer.arriveAndDeregister();
- });
- return new Result(req);
- }
-
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index e24853b9294..c69a8fb48de 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public LocalSyncSession createSyncSession(SyncParameters parameters) {
+ public SyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
+ public AsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index 85be1c11fcd..f087b646ca4 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -76,13 +76,8 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- try {
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
- return;
- }
- catch (RuntimeException e) {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
return;
- }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 33cae60ab93..69dc7c6da74 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,12 +36,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -91,22 +85,17 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
- LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() {
+ AsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
-
- // Let all async operations wait for a signal from the test thread before sending their responses, and let test
- // thread wait for all responses to be delivered afterwards.
- Phaser phaser = new Phaser(1);
- session.setPhaser(phaser);
+ int timeout = 100;
long startTime = System.currentTimeMillis();
- int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -115,38 +104,27 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- // Wait for responses in separate thread.
- Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
- try {
- List<Document> documents = new ArrayList<>();
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeoutMillis - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse) response).getDocument());
- outstandingRequests.remove(response.getRequestId());
- }
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
- }
- catch (InterruptedException e) {
- throw new IllegalArgumentException("Interrupted while waiting for responses");
+ List<Document> documents = new ArrayList<>();
+ try {
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeout - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse)response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
}
- });
-
- // All operations, and receiver, now waiting for this thread to arrive.
- assertEquals(4, phaser.getRegisteredParties());
- assertEquals(0, phaser.getPhase());
- phaser.arrive();
- assertEquals(1, phaser.getPhase());
- phaser.awaitAdvance(phaser.arriveAndDeregister());
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for documents", e);
+ }
- futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
@Test