// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.feed.perf; import com.yahoo.document.serialization.DeserializationException; import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; import com.yahoo.jrt.ListenFailedException; import com.yahoo.messagebus.DynamicThrottlePolicy; import com.yahoo.messagebus.EmptyReply; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.ErrorCode; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageHandler; import com.yahoo.messagebus.Reply; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; /** * @author Simon Thoresen Hult */ public class SimpleFeederTest { private static final String CONFIG_DIR = "target/test-classes/"; @Test public void requireThatXMLFeederWorks() throws Throwable { assertFeed("" + " " + " foo" + " " + " " + " bar" + " " + " " + "", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); } @Test public void requireThatXML2JsonFeederWorks() throws Throwable { ByteArrayOutputStream dump = new ByteArrayOutputStream(); assertFeed(new FeederParams().setDumpStream(dump), "" + " " + " foo" + " " + " " + " bar" + " " + " " + "", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); assertEquals(169, dump.size()); assertEquals(""" [ {"id":"id:simple:simple::0","fields":{"my_str":"foo"}}, {"update":"id:simple:simple::1","fields":{"my_str":{"assign":"bar"}}}, {"remove":"id:simple:simple::2"} ]""", dump.toString()); } @Test public void requireThatDualPutXML2JsonFeederWorks() throws Throwable { ByteArrayOutputStream dump = new ByteArrayOutputStream(); assertFeed(new FeederParams().setDumpStream(dump), "" + " " + " foo" + " " + " " + " bar" + " " + " " + "", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); assertEquals(154, dump.size()); assertEquals(""" [ {"id":"id:simple:simple::0","fields":{"my_str":"foo"}}, {"id":"id:simple:simple::1","fields":{"my_str":"bar"}}, {"remove":"id:simple:simple::2"} ]""", dump.toString()); assertFeed(dump.toString(), new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); } @Test public void requireThatJson2VespaFeederWorks() throws Throwable { ByteArrayOutputStream dump = new ByteArrayOutputStream(); assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA), "[" + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," + " { \"remove\": \"id:simple:simple::2\", \"condition\":\"true\"}" + "]", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); assertEquals(187, dump.size()); assertFeed(new ByteArrayInputStream(dump.toByteArray()), new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); } @Test public void requireThatJsonFeederWorks() throws Throwable { assertFeed("[" + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," + " { \"remove\": \"id:simple:simple::2\"}" + "]", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = ((DocumentMessage)msg).createReply(); reply.swapState(msg); reply.popHandler().handleReply(reply); } }, "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); } @Test public void requireThatParseFailuresThrowInMainThread() throws Throwable { TestDriver driver = new TestDriver(new FeederParams(), "" + " " + "", null); try { driver.run(); fail(); } catch (DeserializationException e) { assertEquals("Field 'id:scheme:simple::0': Must specify an existing document type, not 'unknown' (at line 1, column 83)", e.getMessage()); } assertTrue(driver.close()); } @Test public void requireThatSyncFailuresThrowInMainThread() throws Throwable { TestDriver driver = new TestDriver(new FeederParams(), "" + " " + "", null); driver.feeder.getSourceSession().close(); try { driver.run(); fail(); } catch (IOException e) { assertEquals("[SEND_QUEUE_CLOSED @ localhost]: Source session is closed.", e.getMessage()); } assertTrue(driver.close()); } @Test public void requireThatAsyncFailuresThrowInMainThread() throws Throwable { TestDriver driver = new TestDriver(new FeederParams(), "", new MessageHandler() { @Override public void handleMessage(Message msg) { Reply reply = new EmptyReply(); reply.swapState(msg); reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 6, "foo")); reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 9, "bar")); reply.popHandler().handleReply(reply); } }); try { driver.run(); fail(); } catch (IOException e) { assertMatches("com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage@.+\n" + "\\[UNKNOWN\\(250006\\) @ .+\\]: foo\n" + "\\[UNKNOWN\\(250009\\) @ .+\\]: bar\n", e.getMessage()); } assertTrue(driver.close()); } @Test public void requireThatDynamicThrottlingIsDefault() throws Exception { TestDriver driver = new TestDriver(new FeederParams(), "", null); assertEquals(DynamicThrottlePolicy.class, getThrottlePolicy(driver).getClass()); assertTrue(driver.close()); } @Test public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception { TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(), "", null); assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass()); assertTrue(driver.close()); } private static ThrottlePolicy getThrottlePolicy(TestDriver driver) { return (ThrottlePolicy)getField(driver.feeder.getSourceSession(), "throttlePolicy"); } private static Object getField(Object obj, String fieldName) { try { Field field = obj.getClass().getDeclaredField(fieldName); field.setAccessible(true); return field.get(obj); } catch (IllegalAccessException | NoSuchFieldException e) { throw new AssertionError(e); } } private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); } private static void assertFeed(InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); } private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { assertFeed(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator, expectedErr, expectedOut); } private static void assertFeed(FeederParams params, InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { TestDriver driver = new TestDriver(params, in, validator); driver.run(); assertMatches(expectedErr, driver.err.toString(StandardCharsets.UTF_8)); assertMatches(expectedOut, driver.out.toString(StandardCharsets.UTF_8)); assertTrue(driver.close()); } private static void assertMatches(String expected, String actual) { if (!Pattern.matches(expected, actual)) { assertEquals(expected, actual); } } private static class TestDriver { final ByteArrayOutputStream err = new ByteArrayOutputStream(); final ByteArrayOutputStream out = new ByteArrayOutputStream(); final SimpleFeeder feeder; final SimpleServer server; TestDriver(FeederParams params, String in, MessageHandler validator) throws IOException, ListenFailedException { this(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator); } TestDriver(FeederParams params, InputStream in, MessageHandler validator) throws IOException, ListenFailedException { server = new SimpleServer(CONFIG_DIR, validator); feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR) .setStdErr(new PrintStream(err)) .setInputStreams(List.of(in)) .setStdOut(new PrintStream(out))); } void run() throws Throwable { feeder.run(); } boolean close() throws Exception { feeder.close(); server.close(); return true; } } }