aboutsummaryrefslogtreecommitdiffstats
path: root/docproc
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-09-02 10:09:52 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-09-02 10:09:52 +0000
commitea820868d666739da68850ecae07d20628d941b6 (patch)
tree64c1133c82b00676f7d754662b129aab78696d1f /docproc
parentf5487d3d9eda0632b3762312c478c57d83178827 (diff)
- Add test for sequencing.
- Also sequence multiple operations to the same id as the originating request.
Diffstat (limited to 'docproc')
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java13
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java113
2 files changed, 115 insertions, 11 deletions
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
index 09d25f60835..2b9042bafac 100644
--- a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
@@ -51,6 +51,7 @@ public class MbusRequestContext implements RequestContext, ResponseHandler {
// document being processed is a resource and is then grabbing more resources of
// the same type without releasing its own resources.
public final static String internalNoThrottledSource = "internalNoThrottledSource";
+ private final static String internalNoThrottledSourcePath = "/" + internalNoThrottledSource;
public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler,
ComponentRegistry<DocprocService> docprocServiceComponentRegistry,
@@ -96,12 +97,16 @@ public class MbusRequestContext implements RequestContext, ResponseHandler {
}
long inputSequenceId = requestMsg.getSequenceId();
ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this);
+ int numMsgWithOriginalSequenceId = 0;
for (Message message : messages) {
+ if (message.getSequenceId() == inputSequenceId) numMsgWithOriginalSequenceId++;
+ }
+ for (Message message : messages) {
+ String path = internalNoThrottledSourcePath;
+ if ((numMsgWithOriginalSequenceId == 1) && (message.getSequenceId() == inputSequenceId))
+ path = getUri().getPath();
// See comment for internalNoThrottledSource
- dispatchRequest(message,
- message.getSequenceId() == inputSequenceId ? getUri().getPath()
- : "/" + internalNoThrottledSource,
- responseHandler);
+ dispatchRequest(message, path, responseHandler);
}
}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java
index db92d16b95e..d5a09e921b6 100644
--- a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java
@@ -5,19 +5,33 @@ import com.yahoo.collections.Pair;
import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
-import com.yahoo.document.*;
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.Field;
import com.yahoo.document.datatypes.StringFieldValue;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Trace;
import org.junit.Test;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
@@ -29,6 +43,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
private static final String TOMANY = "many";
private static final String TOONE = "toone";
private static final String TOZERO = "tozero";
+ private static final String TOMULTIPLY = "multiply";
private final DocumentType type;
public DocumentProcessingHandlerForkTestCase() {
@@ -50,6 +65,65 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
putToZero();
}
+ private Trace processDocument(String chain, String id, int numAcks) throws InterruptedException {
+ assertTrue(sendMessage(chain, createPutDocumentMessage(id)));
+ for (int i = 0; i < numAcks; i++) {
+ Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertTrue(remoteMsg instanceof PutDocumentMessage);
+ remoteMsg.getTrace().trace(1, "remoteServer.ack(" + id + ")");
+ remoteServer.ackMessage(remoteMsg);
+ }
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ return reply.getTrace();
+ }
+
+ private int countMatches(String text, String pattern) {
+ int count = 0;
+ for (int fromIndex = text.indexOf(pattern); fromIndex >= 0; fromIndex = text.indexOf(pattern, fromIndex+1)) {
+ count++;
+ }
+ return count;
+ }
+ private int countSequencerMessages(String text) {
+ return countMatches(text, "Sequencer sending message with sequence id");
+ }
+ boolean containsSequenceId(String trace, long sequence) {
+ return trace.contains("Sequencer sending message with sequence id '" + sequence + "'.");
+ }
+ private void testNoExtraSequencingOfNormalOp() throws InterruptedException {
+ String trace = processDocument(TOMULTIPLY, "id:ns:baz::noop", 1).toString();
+ assertTrue(containsSequenceId(trace, 553061876));
+ assertEquals(1, countSequencerMessages(trace));
+ }
+ private void testSequencingWhenChangingId() throws InterruptedException {
+ String trace = processDocument(TOMULTIPLY, "id:ns:baz::transform", 1).toString();
+ assertTrue(containsSequenceId(trace, 2033581295));
+ assertTrue(containsSequenceId(trace, -633118987));
+ assertEquals(2, countSequencerMessages(trace));
+ }
+ private void testSequencingWhenAddingOtherId() throws InterruptedException {
+ String trace = processDocument(TOMULTIPLY, "id:ns:baz::append", 2).toString();
+ assertTrue(containsSequenceId(trace, -334982203));
+ assertTrue(containsSequenceId(trace, -633118987));
+ assertEquals(2, countSequencerMessages(trace));
+ }
+ private void testSequencingWhenAddingSameId() throws InterruptedException {
+ String trace = processDocument(TOMULTIPLY, "id:ns:baz::append_same", 2).toString();
+ assertTrue(containsSequenceId(trace, 1601327001));
+ assertEquals(3, countSequencerMessages(trace));
+ assertEquals(3, countMatches(trace, "Sequencer sending message with sequence id '1601327001'."));
+ }
+
+ @Test
+ public void testSequencing() throws InterruptedException {
+ testNoExtraSequencingOfNormalOp();
+ testSequencingWhenChangingId();
+ testSequencingWhenAddingOtherId();
+ testSequencingWhenAddingSameId();
+ }
+
private void putToManyAllInSameBucket() throws InterruptedException {
assertPutMessages(createPutDocumentMessage(), TOMANYALLINSAMEBUCKET,
"id:123456:baz:n=11111:foo:er:bra",
@@ -94,17 +168,21 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
@Override
protected List<Pair<String, CallStack>> getCallStacks() {
- ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(5);
+ ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(6);
stacks.add(new Pair<>(TOMANYALLINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsAllInSameBucketProcessor())));
stacks.add(new Pair<>(TOMANYSOMEINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsSomeInSameBucketProcessor())));
stacks.add(new Pair<>(TOMANY, new CallStack().addLast(new OneToManyDocumentsProcessor())));
stacks.add(new Pair<>(TOONE, new CallStack().addLast(new OneToOneDocumentsProcessor())));
stacks.add(new Pair<>(TOZERO, new CallStack().addLast(new OneToZeroDocumentsProcessor())));
+ stacks.add(new Pair<>(TOMULTIPLY, new CallStack().addLast(new MultiplyProcessor())));
return stacks;
}
protected PutDocumentMessage createPutDocumentMessage() {
- Document document = new Document(getType(), "id:ns:baz::bar");
+ return createPutDocumentMessage("id:ns:baz::bar");
+ }
+ protected PutDocumentMessage createPutDocumentMessage(String id) {
+ Document document = new Document(getType(), id);
document.setFieldValue("blahblah", new StringFieldValue("This is a test."));
return new PutDocumentMessage(new DocumentPut(document));
}
@@ -144,7 +222,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
}
}
- public class OneToOneDocumentsProcessor extends DocumentProcessor {
+ public static class OneToOneDocumentsProcessor extends DocumentProcessor {
@Override
public Progress process(Processing processing) {
@@ -166,7 +244,7 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
}
}
- public class OneToZeroDocumentsProcessor extends DocumentProcessor {
+ public static class OneToZeroDocumentsProcessor extends DocumentProcessor {
@Override
public Progress process(Processing processing) {
@@ -210,4 +288,25 @@ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHan
}
+ public class MultiplyProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ List<DocumentOperation> docs = processing.getDocumentOperations();
+ assertEquals(1, docs.size());
+ DocumentId id = docs.get(0).getId();
+ if ("id:ns:baz::transform".equals(id.toString())) {
+ docs.clear();
+ docs.add(new DocumentPut(type, "id:ns:baz::appended"));
+ } else if ("id:ns:baz::append".equals(id.toString())) {
+ docs.add(new DocumentPut(type, "id:ns:baz::appended"));
+ } else if ("id:ns:baz::append_same".equals(id.toString())) {
+ docs.add(new DocumentPut(type, "id:ns:baz::append_same"));
+
+ }
+ return Progress.DONE;
+ }
+
+ }
+
}