summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2020-09-25 14:01:41 +0200
committerJon Marius Venstad <venstad@gmail.com>2020-09-25 14:19:41 +0200
commitd0572215be00dd764281818c9f370b7f82adb8bf (patch)
tree42b381eb40c9dd2268773f307edebb4468a46cd4 /documentapi
parent3fe333d33ea24af3f6ebed8e3986db2d82a98bb6 (diff)
Make LocalAsyncSession async when asked to, and test this
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java131
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java6
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java66
3 files changed, 131 insertions, 72 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
index 40f26a82a89..b0ecf1b23b0 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalAsyncSession.java
@@ -20,29 +20,32 @@ import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.UpdateResponse;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Random;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import static com.yahoo.documentapi.Result.ResultType.SUCCESS;
/**
* @author bratseth
+ * @author jonmv
*/
public class LocalAsyncSession implements AsyncSession {
- private final List<Response> responses = new LinkedList<>();
+ private final BlockingQueue<Response> responses = new LinkedBlockingQueue<>();
private final ResponseHandler handler;
private final SyncSession syncSession;
- private long requestId = 0;
- private Random random = new Random();
- private synchronized long getNextRequestId() {
- requestId++;
- return requestId;
- }
+ private AtomicLong requestId = new AtomicLong(0);
+ private AtomicReference<Runnable> synchronizer = new AtomicReference<>();
+ private AtomicReference<Result.ResultType> result = new AtomicReference<>(SUCCESS);
public LocalAsyncSession(AsyncParameters params, LocalDocumentAccess access) {
this.handler = params.getResponseHandler();
- random.setSeed(System.currentTimeMillis());
syncSession = access.createSyncSession(new SyncParameters.Builder().build());
}
@@ -58,14 +61,15 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result put(DocumentPut documentPut, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- syncSession.put(documentPut, pri);
- addResponse(new DocumentResponse(req, documentPut.getDocument()));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ syncSession.put(documentPut, pri);
+ return new DocumentResponse(req, documentPut.getDocument());
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, documentPut.getDocument(), e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -81,13 +85,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result get(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- try {
- addResponse(new DocumentResponse(req, syncSession.get(id)));
- } catch (Exception e) {
- addResponse(new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR));
- }
- return new Result(req);
+ return send(req -> {
+ try {
+ return new DocumentResponse(req, syncSession.get(id));
+ }
+ catch (Exception e) {
+ return new DocumentResponse(req, null, e.getMessage(), Response.Outcome.ERROR);
+ }
+ });
}
@Override
@@ -97,13 +102,14 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result remove(DocumentId id, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.remove(new DocumentRemove(id), pri)) {
- addResponse(new RemoveResponse(req, true));
- } else {
- addResponse(new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.remove(new DocumentRemove(id), pri)) {
+ return new RemoveResponse(req, true);
+ }
+ else {
+ return new DocumentIdResponse(req, id, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
@@ -113,27 +119,24 @@ public class LocalAsyncSession implements AsyncSession {
@Override
public Result update(DocumentUpdate update, DocumentProtocol.Priority pri) {
- long req = getNextRequestId();
- if (syncSession.update(update, pri)) {
- addResponse(new UpdateResponse(req, true));
- } else {
- addResponse(new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND));
- }
- return new Result(req);
+ return send(req -> {
+ if (syncSession.update(update, pri)) {
+ return new UpdateResponse(req, true);
+ }
+ else {
+ return new DocumentUpdateResponse(req, update, "Document not found.", Response.Outcome.NOT_FOUND);
+ }
+ });
}
@Override
public Response getNext() {
- if (responses.isEmpty()) {
- return null;
- }
- int index = random.nextInt(responses.size());
- return responses.remove(index);
+ return responses.poll();
}
@Override
- public Response getNext(int timeout) {
- return getNext();
+ public Response getNext(int timeoutMilliseconds) throws InterruptedException {
+ return responses.poll(timeoutMilliseconds, TimeUnit.MILLISECONDS);
}
@Override
@@ -141,6 +144,20 @@ public class LocalAsyncSession implements AsyncSession {
// empty
}
+ /**
+ * This is run in a separate thread before and after providing the response from each accepted request, for advanced testing.
+ *
+ * If this is not set, which is the default, the documents appear synchronously in the response queue or handler.
+ */
+ public void setSynchronizer(Runnable synchronizer) {
+ this.synchronizer.set(synchronizer);
+ }
+
+ /** Sets the result type returned on subsequence operations against this. Only SUCCESS will cause Repsonses to appear. */
+ public void setResultType(Result.ResultType resultType) {
+ this.result.set(resultType);
+ }
+
private void addResponse(Response response) {
if (handler != null) {
handler.handleResponse(response);
@@ -149,4 +166,24 @@ public class LocalAsyncSession implements AsyncSession {
}
}
+ private Result send(Function<Long, Response> responses) {
+ Result.ResultType resultType = result.get();
+ if (resultType != SUCCESS)
+ return new Result(resultType, new Error());
+
+ long req = requestId.incrementAndGet();
+ synchronizer.getAndUpdate(runnable -> {
+ if (runnable == null)
+ addResponse(responses.apply(req));
+ else
+ new Thread(() -> {
+ runnable.run();
+ addResponse(responses.apply(req));
+ runnable.run();
+ }).start();
+ return runnable;
+ });
+ return new Result(req);
+ }
+
}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
index c69a8fb48de..e24853b9294 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/local/LocalDocumentAccess.java
@@ -34,17 +34,17 @@ public class LocalDocumentAccess extends DocumentAccess {
}
@Override
- public SyncSession createSyncSession(SyncParameters parameters) {
+ public LocalSyncSession createSyncSession(SyncParameters parameters) {
return new LocalSyncSession(this);
}
@Override
- public AsyncSession createAsyncSession(AsyncParameters parameters) {
+ public LocalAsyncSession createAsyncSession(AsyncParameters parameters) {
return new LocalAsyncSession(parameters, this);
}
@Override
- public VisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
+ public LocalVisitorSession createVisitorSession(VisitorParameters parameters) throws ParseException {
return new LocalVisitorSession(this, parameters);
}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
index 69dc7c6da74..ced7a0c352e 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/local/LocalDocumentApiTestCase.java
@@ -36,6 +36,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -85,17 +91,24 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
}
@Test
- public void testAsyncFetch() {
- AsyncSession session = access.createAsyncSession(new AsyncParameters());
+ public void testAsyncFetch() throws InterruptedException, ExecutionException, TimeoutException {
+ LocalAsyncSession session = access.createAsyncSession(new AsyncParameters());
List<DocumentId> ids = new ArrayList<>();
ids.add(new DocumentId("id:music:music::1"));
ids.add(new DocumentId("id:music:music::2"));
ids.add(new DocumentId("id:music:music::3"));
for (DocumentId id : ids)
session.put(new Document(access.getDocumentTypeManager().getDocumentType("music"), id));
- int timeout = 100;
+
+ // Let all async operations wait for a signal from the test thread before and after each response.
+ Phaser phaser = new Phaser(1);
+ session.setSynchronizer(() -> {
+ phaser.register();
+ phaser.awaitAdvance(phaser.arriveAndDeregister());
+ });
long startTime = System.currentTimeMillis();
+ int timeoutMillis = 1000;
Set<Long> outstandingRequests = new HashSet<>();
for (DocumentId id : ids) {
Result result = session.get(id);
@@ -104,27 +117,36 @@ public class LocalDocumentApiTestCase extends AbstractDocumentApiTestCase {
outstandingRequests.add(result.getRequestId());
}
- List<Document> documents = new ArrayList<>();
- try {
- while ( ! outstandingRequests.isEmpty()) {
- int timeSinceStart = (int)(System.currentTimeMillis() - startTime);
- Response response = session.getNext(timeout - timeSinceStart);
- if (response == null)
- throw new RuntimeException("Timed out waiting for documents"); // or return what you have
- if ( ! outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
-
- if (response.isSuccess())
- documents.add(((DocumentResponse)response).getDocument());
- outstandingRequests.remove(response.getRequestId());
+ // Wait for responses in separate thread.
+ Future<?> futureWithAssertions = Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ List<Document> documents = new ArrayList<>();
+ while (!outstandingRequests.isEmpty()) {
+ int timeSinceStart = (int) (System.currentTimeMillis() - startTime);
+ Response response = session.getNext(timeoutMillis - timeSinceStart);
+ if (response == null)
+ throw new RuntimeException("Timed out waiting for documents"); // or return what you have
+ if (!outstandingRequests.contains(response.getRequestId())) continue; // Stale: Ignore
+
+ if (response.isSuccess())
+ documents.add(((DocumentResponse) response).getDocument());
+ outstandingRequests.remove(response.getRequestId());
+ }
+ assertEquals(3, documents.size());
+ for (Document document : documents)
+ assertNotNull(document);
}
- }
- catch (InterruptedException e) {
- throw new RuntimeException("Interrupted while waiting for documents", e);
- }
+ catch (InterruptedException e) {
+ throw new IllegalArgumentException("Interrupted while waiting for responses");
+ }
+ });
+
+ // All operations, and receiver, now waiting for this thread to arrive.
+ assertEquals(1, phaser.getRegisteredParties());
+ assertEquals(0, phaser.getPhase());
+ phaser.awaitAdvance(phaser.arriveAndDeregister()); // Deregister so threads can finish without waiting after response.
- assertEquals(3, documents.size());
- for (Document document : documents)
- assertNotNull(document);
+ futureWithAssertions.get(1000, TimeUnit.MILLISECONDS);
}
@Test