From ea820868d666739da68850ecae07d20628d941b6 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 2 Sep 2020 10:09:52 +0000 Subject: - Add test for sequencing. - Also sequence multiple operations to the same id as the originating request. --- .../jdisc/messagebus/MbusRequestContext.java | 13 ++- .../DocumentProcessingHandlerForkTestCase.java | 113 +++++++++++++++++++-- .../messagebus/jdisc/MbusRequestTestCase.java | 4 +- 3 files changed, 118 insertions(+), 12 deletions(-) diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java index 09d25f60835..2b9042bafac 100644 --- a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java @@ -51,6 +51,7 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { // document being processed is a resource and is then grabbing more resources of // the same type without releasing its own resources. public final static String internalNoThrottledSource = "internalNoThrottledSource"; + private final static String internalNoThrottledSourcePath = "/" + internalNoThrottledSource; public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler, ComponentRegistry docprocServiceComponentRegistry, @@ -96,12 +97,16 @@ public class MbusRequestContext implements RequestContext, ResponseHandler { } long inputSequenceId = requestMsg.getSequenceId(); ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this); + int numMsgWithOriginalSequenceId = 0; for (Message message : messages) { + if (message.getSequenceId() == inputSequenceId) numMsgWithOriginalSequenceId++; + } + for (Message message : messages) { + String path = internalNoThrottledSourcePath; + if ((numMsgWithOriginalSequenceId == 1) && (message.getSequenceId() == inputSequenceId)) + path = getUri().getPath(); // See comment for internalNoThrottledSource - dispatchRequest(message, - message.getSequenceId() == inputSequenceId ? getUri().getPath() - : "/" + internalNoThrottledSource, - responseHandler); + dispatchRequest(message, path, responseHandler); } } diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java index db92d16b95e..d5a09e921b6 100644 --- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java @@ -5,19 +5,33 @@ import com.yahoo.collections.Pair; import com.yahoo.docproc.CallStack; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.docproc.Processing; -import com.yahoo.document.*; +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.Field; import com.yahoo.document.datatypes.StringFieldValue; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.Trace; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** * @author Einar M R Rosenvinge @@ -29,6 +43,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan private static final String TOMANY = "many"; private static final String TOONE = "toone"; private static final String TOZERO = "tozero"; + private static final String TOMULTIPLY = "multiply"; private final DocumentType type; public DocumentProcessingHandlerForkTestCase() { @@ -50,6 +65,65 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan putToZero(); } + private Trace processDocument(String chain, String id, int numAcks) throws InterruptedException { + assertTrue(sendMessage(chain, createPutDocumentMessage(id))); + for (int i = 0; i < numAcks; i++) { + Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertTrue(remoteMsg instanceof PutDocumentMessage); + remoteMsg.getTrace().trace(1, "remoteServer.ack(" + id + ")"); + remoteServer.ackMessage(remoteMsg); + } + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + return reply.getTrace(); + } + + private int countMatches(String text, String pattern) { + int count = 0; + for (int fromIndex = text.indexOf(pattern); fromIndex >= 0; fromIndex = text.indexOf(pattern, fromIndex+1)) { + count++; + } + return count; + } + private int countSequencerMessages(String text) { + return countMatches(text, "Sequencer sending message with sequence id"); + } + boolean containsSequenceId(String trace, long sequence) { + return trace.contains("Sequencer sending message with sequence id '" + sequence + "'."); + } + private void testNoExtraSequencingOfNormalOp() throws InterruptedException { + String trace = processDocument(TOMULTIPLY, "id:ns:baz::noop", 1).toString(); + assertTrue(containsSequenceId(trace, 553061876)); + assertEquals(1, countSequencerMessages(trace)); + } + private void testSequencingWhenChangingId() throws InterruptedException { + String trace = processDocument(TOMULTIPLY, "id:ns:baz::transform", 1).toString(); + assertTrue(containsSequenceId(trace, 2033581295)); + assertTrue(containsSequenceId(trace, -633118987)); + assertEquals(2, countSequencerMessages(trace)); + } + private void testSequencingWhenAddingOtherId() throws InterruptedException { + String trace = processDocument(TOMULTIPLY, "id:ns:baz::append", 2).toString(); + assertTrue(containsSequenceId(trace, -334982203)); + assertTrue(containsSequenceId(trace, -633118987)); + assertEquals(2, countSequencerMessages(trace)); + } + private void testSequencingWhenAddingSameId() throws InterruptedException { + String trace = processDocument(TOMULTIPLY, "id:ns:baz::append_same", 2).toString(); + assertTrue(containsSequenceId(trace, 1601327001)); + assertEquals(3, countSequencerMessages(trace)); + assertEquals(3, countMatches(trace, "Sequencer sending message with sequence id '1601327001'.")); + } + + @Test + public void testSequencing() throws InterruptedException { + testNoExtraSequencingOfNormalOp(); + testSequencingWhenChangingId(); + testSequencingWhenAddingOtherId(); + testSequencingWhenAddingSameId(); + } + private void putToManyAllInSameBucket() throws InterruptedException { assertPutMessages(createPutDocumentMessage(), TOMANYALLINSAMEBUCKET, "id:123456:baz:n=11111:foo:er:bra", @@ -94,17 +168,21 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan @Override protected List> getCallStacks() { - ArrayList> stacks = new ArrayList<>(5); + ArrayList> stacks = new ArrayList<>(6); stacks.add(new Pair<>(TOMANYALLINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsAllInSameBucketProcessor()))); stacks.add(new Pair<>(TOMANYSOMEINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsSomeInSameBucketProcessor()))); stacks.add(new Pair<>(TOMANY, new CallStack().addLast(new OneToManyDocumentsProcessor()))); stacks.add(new Pair<>(TOONE, new CallStack().addLast(new OneToOneDocumentsProcessor()))); stacks.add(new Pair<>(TOZERO, new CallStack().addLast(new OneToZeroDocumentsProcessor()))); + stacks.add(new Pair<>(TOMULTIPLY, new CallStack().addLast(new MultiplyProcessor()))); return stacks; } protected PutDocumentMessage createPutDocumentMessage() { - Document document = new Document(getType(), "id:ns:baz::bar"); + return createPutDocumentMessage("id:ns:baz::bar"); + } + protected PutDocumentMessage createPutDocumentMessage(String id) { + Document document = new Document(getType(), id); document.setFieldValue("blahblah", new StringFieldValue("This is a test.")); return new PutDocumentMessage(new DocumentPut(document)); } @@ -144,7 +222,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan } } - public class OneToOneDocumentsProcessor extends DocumentProcessor { + public static class OneToOneDocumentsProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { @@ -166,7 +244,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan } } - public class OneToZeroDocumentsProcessor extends DocumentProcessor { + public static class OneToZeroDocumentsProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { @@ -210,4 +288,25 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan } + public class MultiplyProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + List docs = processing.getDocumentOperations(); + assertEquals(1, docs.size()); + DocumentId id = docs.get(0).getId(); + if ("id:ns:baz::transform".equals(id.toString())) { + docs.clear(); + docs.add(new DocumentPut(type, "id:ns:baz::appended")); + } else if ("id:ns:baz::append".equals(id.toString())) { + docs.add(new DocumentPut(type, "id:ns:baz::appended")); + } else if ("id:ns:baz::append_same".equals(id.toString())) { + docs.add(new DocumentPut(type, "id:ns:baz::append_same")); + + } + return Progress.DONE; + } + + } + } diff --git a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java index 3d998a239f1..c68ab4e6742 100644 --- a/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java +++ b/jdisc_messagebus_service/src/test/java/com/yahoo/messagebus/jdisc/MbusRequestTestCase.java @@ -9,7 +9,9 @@ import org.junit.Test; import java.net.URI; -import static org.junit.Assert.*; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author Simon Thoresen Hult -- cgit v1.2.3