aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-04-19 15:41:23 +0200
committerjonmv <venstad@gmail.com>2022-04-19 15:41:23 +0200
commit407a457cd46be903062316f7d944dc3be512b457 (patch)
tree8d2c007866f1f79c48fb590819264bc52a79d504 /documentapi
parent6f7cac65ca194a46ee3c52bb7fab2094a96954cb (diff)
Wait in destinatino until RPC layer times out request
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java43
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java50
2 files changed, 38 insertions, 55 deletions
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java
index a3c63772af2..81eb237713f 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/Destination.java
@@ -4,9 +4,11 @@ package com.yahoo.documentapi.messagebus;
import com.yahoo.document.DocumentRemove;
import com.yahoo.documentapi.DocumentAccess;
import com.yahoo.documentapi.DocumentAccessParams;
+import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.SyncParameters;
import com.yahoo.documentapi.SyncSession;
import com.yahoo.documentapi.local.LocalDocumentAccess;
+import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
@@ -28,6 +30,7 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Phaser;
/**
* Mock-up destination used for testing.
@@ -36,6 +39,8 @@ import java.util.List;
*/
public class Destination implements MessageHandler {
+ final Phaser phaser = new Phaser(1);
+
private final DestinationSession session;
private final DocumentAccess access;
private final SyncSession local;
@@ -55,33 +60,35 @@ public class Destination implements MessageHandler {
session = bus.getMessageBus().createDestinationSession("session", true, this);
}
- protected void sendReply(Reply reply) {
- session.reply(reply);
- }
-
public void handleMessage(Message msg) {
+
+ phaser.arriveAndAwaitAdvance();
Reply reply = ((DocumentMessage)msg).createReply();
try {
switch (msg.getType()) {
- case DocumentProtocol.MESSAGE_GETDOCUMENT:
- reply = new GetDocumentReply(local.get(((GetDocumentMessage)msg).getDocumentId()));
- break;
+ case DocumentProtocol.MESSAGE_GETDOCUMENT:
+ reply = new GetDocumentReply(local.get(((GetDocumentMessage)msg).getDocumentId()));
+ break;
+
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT:
+ local.put(((PutDocumentMessage)msg).getDocumentPut());
+ break;
- case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- local.put(((PutDocumentMessage)msg).getDocumentPut());
- break;
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
+ local.remove(new DocumentRemove(((RemoveDocumentMessage)msg).getDocumentId()));
+ break;
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- local.remove(new DocumentRemove(((RemoveDocumentMessage)msg).getDocumentId()));
- break;
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
+ local.update(((UpdateDocumentMessage)msg).getDocumentUpdate());
+ break;
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- local.update(((UpdateDocumentMessage)msg).getDocumentUpdate());
- break;
+ case DocumentProtocol.MESSAGE_CREATEVISITOR:
+ ((CreateVisitorReply) reply).setLastBucket(ProgressToken.FINISHED_BUCKET);
+ break;
- default:
- throw new UnsupportedOperationException("Unsupported message type '" + msg.getType() + "'.");
+ default:
+ throw new UnsupportedOperationException("Unsupported message type '" + msg.getType() + "'.");
}
} catch (Exception e) {
reply = new EmptyReply();
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java
index d1e0d7685c6..44b600fe69f 100644
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/MessageBusDocumentApiTestCase.java
@@ -71,7 +71,7 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase {
params.setTraceLevel(9);
access = new MessageBusDocumentAccess(params);
- destination = new VisitableDestination(slobrokConfigId, params.getDocumentManagerConfigId());
+ destination = new Destination(slobrokConfigId, params.getDocumentManagerConfigId());
}
@After
@@ -81,25 +81,6 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase {
slobrok.stop();
}
- private static class VisitableDestination extends Destination {
- private VisitableDestination(String slobrokConfigId, String documentManagerConfigId) {
- super(slobrokConfigId, documentManagerConfigId);
- }
-
- public void handleMessage(Message msg) {
- if (msg.getType() == DocumentProtocol.MESSAGE_CREATEVISITOR) {
- Reply reply = ((DocumentMessage)msg).createReply();
- msg.swapState(reply);
- CreateVisitorReply visitorReply = (CreateVisitorReply)reply;
- visitorReply.setLastBucket(ProgressToken.FINISHED_BUCKET);
- sendReply(reply);
- } else {
- super.handleMessage(msg);
- }
- }
- }
-
-
@Test
public void requireThatVisitorSessionWorksWithMessageBus() throws ParseException, InterruptedException {
VisitorParameters parameters = new VisitorParameters("id.user==1234");
@@ -121,23 +102,18 @@ public class MessageBusDocumentApiTestCase extends AbstractDocumentApiTestCase {
DocumentType type = access().getDocumentTypeManager().getDocumentType("music");
Document doc1 = new Document(type, new DocumentId("id:ns:music::1"));
- // The setup is broken. Zero or negative timeout has semantics in messagebus, so ensuring a timeout is impossible,as negative
- // remaining time is converted to 1 ms further down. We therefore try tenfold, and accept > 0 timeouts as success ... (;☉_☉)
- // We could return a timeout from the destination, but then we're not testing what we want, namely that the sender times out.
- int attempts = 10;
- for (int i = 0; ++i <= attempts; ) {
- assertTrue(session.put(new DocumentPut(doc1),
- DocumentOperationParameters.parameters()
- .withResponseHandler(result -> {
- response.set(result);
- latch.countDown();
- })
- .withDeadline(Instant.now().plusMillis(40)))
- .isSuccess());
- assertTrue(latch.await(60, TimeUnit.SECONDS));
- if (response.get().outcome() == Outcome.TIMEOUT) break;
- if (i == attempts) assertEquals(Response.Outcome.TIMEOUT, response.get().outcome());
- }
+ destination.phaser.register();
+ assertTrue(session.put(new DocumentPut(doc1),
+ DocumentOperationParameters.parameters()
+ .withResponseHandler(result -> {
+ response.set(result);
+ latch.countDown();
+ })
+ .withDeadline(Instant.now().plusMillis(100)))
+ .isSuccess());
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+ assertEquals(Response.Outcome.TIMEOUT, response.get().outcome());
+ destination.phaser.arriveAndDeregister();
session.destroy();
}