aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-29 08:39:35 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-30 10:23:34 +0200
commit11405e52f2853e44df0944bf7bbee13dc2e617a5 (patch)
tree572e253df6f8c090072da3b211b6973b75f3ef37 /documentapi/src
parentbc54f2ad34e2e4737a4de326035fdf00d5729da1 (diff)
Revert "Revert "Jonmv/async doc v1 implementation""
This reverts commit c6aded1606112a54969f56403085ca90d61dac8f.
Diffstat (limited to 'documentapi/src')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java136
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java6
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java7
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
4 files changed, 142 insertions, 73 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 40f26a82a89..ff3eeb02a71 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -20,29 +20,36 @@ import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
/**
* @author bratseth
+ * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final List<Response> responses = new LinkedList<>();
+ private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private long requestId = 0;
- private Random random = new Random();
+ private final Executor executor = Executors.newCachedThreadPool();
- private synchronized long getNextRequestId() {
- requestId++;
- return requestId;
- }
+ private AtomicLong requestId = new AtomicLong(0);
+ private AtomicReference<Phaser> phaser = new AtomicReference<>();
+ private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- random.setSeed(System.currentTimeMillis());
syncSession = access.createSyncSession(new SyncParameters.Builder().build());
}
@@ -58,14 +65,15 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- syncSession.put(documentPut, pri);
- addResponse(new DocumentResponse(req, documentPut.getDocument()));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ syncSession.put(documentPut, pri);
+ return new DocumentResponse(req, documentPut.getDocument());
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -81,13 +89,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- addResponse(new DocumentResponse(req, syncSession.get(id)));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ return new DocumentResponse(req, syncSession.get(id));
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -97,13 +106,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- addResponse(new RemoveResponse(req, true));
- } else {
- addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.remove(new DocumentRemove(id), pri)) {
+ return new RemoveResponse(req, true);
+ }
+ else {
+ return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
@@ -113,27 +123,24 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.update(update, pri)) {
- addResponse(new UpdateResponse(req, true));
- } else {
- addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.update(update, pri)) {
+ return new UpdateResponse(req, true);
+ }
+ else {
+ return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
public Response getNext() {
- if (responses.isEmpty()) {
- return null;
- }
- int index = random.nextInt(responses.size());
- return responses.remove(index);
+ return responses.poll();
}
@Override
- public Response getNext(int timeout) {
- return getNext();
+ public Response getNext(int timeoutMilliseconds) throws InterruptedException {
+ return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
@Override
@@ -141,6 +148,22 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
+ /**
+ * When this is set, every operation is sent in a separate thread, which first registers with the given phaser,
+ * and then arrives and awaits advance so the user can trigger responses. After the response is delivered,
+ * the thread arrives and deregisters with the phaser, so the user can wait until all responses have been delivered.
+ *
+ * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
+ */
+ public void setPhaser(Phaser phaser) {
+ this.phaser.set(phaser);
+ }
+
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
+ public void setResultType(Result.ResultType resultType) {
+ this.result.set(resultType);
+ }
+
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -149,4 +172,23 @@ public class LocalAsyncSession implements AsyncSession {
}
}
+ private Result send(Function<Long, Response> responses) {
+ Result.ResultType resultType = result.get();
+ if (resultType != SUCCESS)
+ return new Result(resultType, new Error());
+
+ long req = requestId.incrementAndGet();
+ Phaser synchronizer = phaser.get();
+ if (synchronizer == null)
+ addResponse(responses.apply(req));
+ else
+ executor.execute(() -> {
+ synchronizer.register();
+ synchronizer.arriveAndAwaitAdvance();
+ addResponse(responses.apply(req));
+ synchronizer.arriveAndDeregister();
+ });
+ return new Result(req);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index c69a8fb48de..e24853b9294 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public SyncSession createSyncSession(SyncParameters parameters) {
+ public LocalSyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public AsyncSession createAsyncSession(AsyncParameters parameters) {
+ public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
index f087b646ca4..85be1c11fcd 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalVisitorSession.java
@@ -76,8 +76,13 @@ public class LocalVisitorSession implements VisitorSession {
if (state.get() != State.RUNNING)
return;
- if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ try {
+ if (selector.accepts(new DocumentPut(document)) != Result.TRUE)
+ return;
+ }
+ catch (RuntimeException e) {
return;
+ }
Document copy = new Document(document.getDataType(), document.getId());
new FieldSetRepo().copyFields(document, copy, fieldSet);
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 69dc7c6da74..33cae60ab93 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,6 +36,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,17 +91,22 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
+ LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- int timeout = 100;
+
+ // Let all async operations wait for a signal from the test thread before sending their responses, and let test
+ // thread wait for all responses to be delivered afterwards.
+ Phaser phaser = new Phaser(1);
+ session.setPhaser(phaser);
long startTime = System.currentTimeMillis();
+ int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -104,27 +115,38 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- List<Document> documents = new ArrayList<>();
- try {
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeout - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse)response).getDocument());
- outstandingRequests.remove(response.getRequestId());
+ // Wait for responses in separate thread.
+ Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ List<Document> documents = new ArrayList<>();
+ while ( ! outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeoutMillis - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse) response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
+ }
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for documents", e);
- }
+ catch (InterruptedException e) {
+ throw new IllegalArgumentException("Interrupted while waiting for responses");
+ }
+ });
+
+ // All operations, and receiver, now waiting for this thread to arrive.
+ assertEquals(4, phaser.getRegisteredParties());
+ assertEquals(0, phaser.getPhase());
+ phaser.arrive();
+ assertEquals(1, phaser.getPhase());
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
+ futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
}
@Test