// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.docproc.jdisc; import com.yahoo.collections.Pair; import com.yahoo.docproc.CallStack; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.docproc.Processing; import com.yahoo.document.DataType; import com.yahoo.document.Document; import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentOperation; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentType; import com.yahoo.document.Field; import com.yahoo.document.datatypes.StringFieldValue; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.Trace; import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** * @author Einar M R Rosenvinge */ public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHandlerTestBase { private static final String TOMANYALLINSAMEBUCKET = "tomanyallinsamebucket"; private static final String TOMANYSOMEINSAMEBUCKET = "tomanysomeinsamebucket"; private static final String TOMANY = "many"; private static final String TOONE = "toone"; private static final String TOZERO = "tozero"; private static final String TOMULTIPLY = "multiply"; private final DocumentType type; public DocumentProcessingHandlerForkTestCase() { this.type = new DocumentType("baz"); this.type.addField(new Field("blahblah", DataType.STRING)); } @Override public DocumentType getType() { return type; } @Test public void testMessages() throws InterruptedException { putToFourPuts(); putToManyAllInSameBucket(); putToManySomeInSameBucket(); putToOne(); putToZero(); } private Trace processDocument(String chain, String id, int numAcks) throws InterruptedException { assertTrue(sendMessage(chain, createPutDocumentMessage(id))); for (int i = 0; i < numAcks; i++) { Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS); assertTrue(remoteMsg instanceof PutDocumentMessage); remoteMsg.getTrace().trace(1, "remoteServer.ack(" + id + ")"); remoteServer.ackMessage(remoteMsg); } Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); assertNotNull(reply); assertFalse(reply.hasErrors()); return reply.getTrace(); } private int countMatches(String text, String pattern) { int count = 0; for (int fromIndex = text.indexOf(pattern); fromIndex >= 0; fromIndex = text.indexOf(pattern, fromIndex+1)) { count++; } return count; } private int countSequencerMessages(String text) { return countMatches(text, "Sequencer sending message with sequence id"); } boolean containsSequenceId(String trace, long sequence) { return trace.contains("Sequencer sending message with sequence id '" + sequence + "'."); } private void testNoExtraSequencingOfNormalOp() throws InterruptedException { String trace = processDocument(TOMULTIPLY, "id:ns:baz::noop", 1).toString(); assertTrue(containsSequenceId(trace, 553061876)); assertEquals(1, countSequencerMessages(trace)); } private void testSequencingWhenChangingId() throws InterruptedException { String trace = processDocument(TOMULTIPLY, "id:ns:baz::transform", 1).toString(); assertTrue(containsSequenceId(trace, 2033581295)); assertTrue(containsSequenceId(trace, -633118987)); assertEquals(2, countSequencerMessages(trace)); } private void testSequencingWhenAddingOtherId() throws InterruptedException { String trace = processDocument(TOMULTIPLY, "id:ns:baz::append", 2).toString(); assertTrue(containsSequenceId(trace, -334982203)); assertTrue(containsSequenceId(trace, -633118987)); assertEquals(2, countSequencerMessages(trace)); } private void testSequencingWhenAddingSameId() throws InterruptedException { String trace = processDocument(TOMULTIPLY, "id:ns:baz::append_same", 2).toString(); assertTrue(containsSequenceId(trace, 1601327001)); assertEquals(3, countSequencerMessages(trace)); assertEquals(3, countMatches(trace, "Sequencer sending message with sequence id '1601327001'.")); } @Test public void testSequencing() throws InterruptedException { testNoExtraSequencingOfNormalOp(); testSequencingWhenChangingId(); testSequencingWhenAddingOtherId(); testSequencingWhenAddingSameId(); } private void putToManyAllInSameBucket() throws InterruptedException { assertPutMessages(createPutDocumentMessage(), TOMANYALLINSAMEBUCKET, "id:123456:baz:n=11111:foo:er:bra", "id:123456:baz:n=11111:foo:trallala", "id:123456:baz:n=11111:foo:a"); } private void putToManySomeInSameBucket() throws InterruptedException { assertPutMessages(createPutDocumentMessage(), TOMANYSOMEINSAMEBUCKET, "id:123456:baz:n=7890:bar:er:bra", "id:foo:baz::er:ja", "id:567890:baz:n=1234:a", "id:foo:baz::hahahhaa", "id:123456:baz:n=7890:a:a", "id:foo:baz::aa", "id:567890:baz:n=1234:bar:ala", "id:foo:baz::sdfgsaa", "id:123456:baz:n=7890:bar:tralsfa", "id:foo:baz::dfshaa"); } private void putToFourPuts() throws InterruptedException { assertPutMessages(createPutDocumentMessage(), TOMANY, "id:foo:baz::er:bra", "id:foo:baz::er:ja", "id:foo:baz::hahahhaa", "id:foo:baz::trallala"); } private void putToOne() throws InterruptedException { assertPutMessages(createPutDocumentMessage(), TOONE, "id:ns:baz::bar"); } private void putToZero() throws InterruptedException { assertTrue(sendMessage(TOZERO, createPutDocumentMessage())); Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); assertTrue(reply instanceof WriteDocumentReply); assertFalse(reply.hasErrors()); } @Override protected List> getCallStacks() { ArrayList> stacks = new ArrayList<>(6); stacks.add(new Pair<>(TOMANYALLINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsAllInSameBucketProcessor()))); stacks.add(new Pair<>(TOMANYSOMEINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsSomeInSameBucketProcessor()))); stacks.add(new Pair<>(TOMANY, new CallStack().addLast(new OneToManyDocumentsProcessor()))); stacks.add(new Pair<>(TOONE, new CallStack().addLast(new OneToOneDocumentsProcessor()))); stacks.add(new Pair<>(TOZERO, new CallStack().addLast(new OneToZeroDocumentsProcessor()))); stacks.add(new Pair<>(TOMULTIPLY, new CallStack().addLast(new MultiplyProcessor()))); return stacks; } protected PutDocumentMessage createPutDocumentMessage() { return createPutDocumentMessage("id:ns:baz::bar"); } protected PutDocumentMessage createPutDocumentMessage(String id) { Document document = new Document(getType(), id); document.setFieldValue("blahblah", new StringFieldValue("This is a test.")); return new PutDocumentMessage(new DocumentPut(document)); } private void assertPutMessages(DocumentMessage msg, String route, String... expected) throws InterruptedException { msg.getTrace().setLevel(9); assertTrue(sendMessage(route, msg)); String[] actual = new String[expected.length]; for (int i = 0; i < expected.length; ++i) { Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS); assertTrue(remoteMsg instanceof PutDocumentMessage); remoteMsg.getTrace().trace(1, "remoteServer.ack(" + expected[i] + ")"); remoteServer.ackMessage(remoteMsg); actual[i] = ((PutDocumentMessage)remoteMsg).getDocumentPut().getDocument().getId().toString(); } assertNull(remoteServer.awaitMessage(100, TimeUnit.MILLISECONDS)); Arrays.sort(expected); Arrays.sort(actual); assertArrayEquals(expected, actual); Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); assertNotNull(reply); assertFalse(reply.hasErrors()); String trace = reply.getTrace().toString(); for (String documentId : expected) { assertTrue("missing trace for document '" + documentId + "'\n" + trace, trace.contains("remoteServer.ack(" + documentId + ")")); } if (expected.length == 1) { assertFalse("unexpected fork in trace for single document\n" + trace, trace.contains("")); } else { assertTrue("missing fork in trace for " + expected.length + " split\n" + trace, trace.contains("")); } } public static class OneToOneDocumentsProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { return Progress.DONE; } } public class OneToManyDocumentsProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { List operations = processing.getDocumentOperations(); operations.clear(); operations.add(new DocumentPut(type, "id:foo:baz::er:bra")); operations.add(new DocumentPut(type, "id:foo:baz::er:ja")); operations.add(new DocumentPut(type, "id:foo:baz::trallala")); operations.add(new DocumentPut(type, "id:foo:baz::hahahhaa")); return Progress.DONE; } } public static class OneToZeroDocumentsProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { processing.getDocumentOperations().clear(); return Progress.DONE; } } public class OneToManyDocumentsSomeInSameBucketProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { List operations = processing.getDocumentOperations(); operations.clear(); operations.add(new DocumentPut(type, "id:123456:baz:n=7890:bar:er:bra")); operations.add(new DocumentPut(type, "id:foo:baz::er:ja")); operations.add(new DocumentPut(type, "id:567890:baz:n=1234:a")); operations.add(new DocumentPut(type, "id:foo:baz::hahahhaa")); operations.add(new DocumentPut(type, "id:123456:baz:n=7890:a:a")); operations.add(new DocumentPut(type, "id:foo:baz::aa")); operations.add(new DocumentPut(type, "id:567890:baz:n=1234:bar:ala")); operations.add(new DocumentPut(type, "id:foo:baz::sdfgsaa")); operations.add(new DocumentPut(type, "id:123456:baz:n=7890:bar:tralsfa")); operations.add(new DocumentPut(type, "id:foo:baz::dfshaa")); return Progress.DONE; } } public class OneToManyDocumentsAllInSameBucketProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { List docs = processing.getDocumentOperations(); docs.clear(); docs.add(new DocumentPut(type, "id:123456:baz:n=11111:foo:er:bra")); docs.add(new DocumentPut(type, "id:123456:baz:n=11111:foo:trallala")); docs.add(new DocumentPut(type, "id:123456:baz:n=11111:foo:a")); return Progress.DONE; } } public class MultiplyProcessor extends DocumentProcessor { @Override public Progress process(Processing processing) { List docs = processing.getDocumentOperations(); assertEquals(1, docs.size()); DocumentId id = docs.get(0).getId(); if ("id:ns:baz::transform".equals(id.toString())) { docs.clear(); docs.add(new DocumentPut(type, "id:ns:baz::appended")); } else if ("id:ns:baz::append".equals(id.toString())) { docs.add(new DocumentPut(type, "id:ns:baz::appended")); } else if ("id:ns:baz::append_same".equals(id.toString())) { docs.add(new DocumentPut(type, "id:ns:baz::append_same")); } return Progress.DONE; } } }