diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /docproc |
Publish
Diffstat (limited to 'docproc')
89 files changed, 7592 insertions, 0 deletions
diff --git a/docproc/.gitignore b/docproc/.gitignore new file mode 100644 index 00000000000..245fc81a1b5 --- /dev/null +++ b/docproc/.gitignore @@ -0,0 +1,7 @@ +target +docproc.iml +.settings +.classpath +.project +/tmp +/pom.xml.build diff --git a/docproc/OWNERS b/docproc/OWNERS new file mode 100644 index 00000000000..31af040f698 --- /dev/null +++ b/docproc/OWNERS @@ -0,0 +1 @@ +bratseth diff --git a/docproc/README b/docproc/README new file mode 100644 index 00000000000..d2b667c2565 --- /dev/null +++ b/docproc/README @@ -0,0 +1 @@ +Vespa Document Processing Framework (Java). diff --git a/docproc/pom.xml b/docproc/pom.xml new file mode 100644 index 00000000000..fe7228ba47d --- /dev/null +++ b/docproc/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0"?> +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>docproc</artifactId> + <packaging>jar</packaging> + <version>6-SNAPSHOT</version> + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>provided-dependencies</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>component</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>container-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-bundle</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>messagebus-disc</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>container-messagebus</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>com.yahoo.vespa</groupId> + <artifactId>config-class-plugin</artifactId> + <version>${project.version}</version> + <executions> + <execution> + <id>config-gen</id> + <goals> + <goal>config-gen</goal> + </goals> + </execution> + <execution> + <id>configgen-test-defs</id> + <phase>generate-test-sources</phase> + <goals> + <goal>config-gen</goal> + </goals> + <configuration> + <defFilesDirectories>src/test/vespa-configdef</defFilesDirectories> + <outputDirectory>target/generated-test-sources/vespa-configgen-plugin</outputDirectory> + <testConfig>true</testConfig> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/docproc/src/main/java/com/yahoo/config/docproc/package-info.java b/docproc/src/main/java/com/yahoo/config/docproc/package-info.java new file mode 100644 index 00000000000..3a2b850c609 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/config/docproc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.config.docproc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java new file mode 100644 index 00000000000..c967eeedd20 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.annotation.Annotation; +import com.yahoo.document.datatypes.Struct; +import com.yahoo.document.datatypes.StructuredFieldValue; +import com.yahoo.yolean.Exceptions; + +/** + * Subtyped by factory classes for concrete document types. The factory classes are auto-generated + * by vespa-documentgen-plugin. This superclass is used to manage the factories in OSGI. + * @author vegardh + * @since 5.1 + * + */ +public abstract class AbstractConcreteDocumentFactory extends com.yahoo.component.AbstractComponent { + public abstract Map<String, Class<? extends Document>> documentTypes(); + public abstract Map<String, Class<? extends Struct>> structTypes(); + public abstract Map<String, Class<? extends Annotation>> annotationTypes(); + + /** + * Used by the docproc framework to get an instance of a concrete document type without resorting to reflection in a bundle + * + * @return A concrete document instance + */ + public com.yahoo.document.Document getDocumentCopy(java.lang.String type, com.yahoo.document.datatypes.StructuredFieldValue src, com.yahoo.document.DocumentId id) { + // Note: This method can't be abstract because it must work with older bundles where the ConcreteDocumentFactory may not implement it. + // It is overridden to not use reflection by newer bundles. + // The implementation here is not so good in bundles, since it instantiates the doc using reflection. + // TODO: for 6.0: make this method abstract and throw away the code below. + Class<? extends Document> concreteClass = documentTypes().get(type); + try { + Constructor<? extends Document> copyCon = concreteClass.getConstructor(StructuredFieldValue.class, DocumentId.class); + return copyCon.newInstance(src, id); + } catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException(Exceptions.toMessageString(e), e); + } + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/Accesses.java b/docproc/src/main/java/com/yahoo/docproc/Accesses.java new file mode 100644 index 00000000000..99f9cb0073c --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/Accesses.java @@ -0,0 +1,55 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Docprocs tagged with this will read and/or write annotations on the given field(s). + * @author vegardh + * + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +public @interface Accesses { + Field[] value(); + + /** + * Describes the annotations produced and consumed on one field in a document + * @author vegardh + * + */ + @Documented + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + public @interface Field { + /** The name of the document field */ + String name(); + /** The datatype of the field */ + String dataType(); + /** The trees of annotations that this docproc accesses on this field */ + Tree[] annotations() default {}; + String description(); + + /** + * Describes the annotations produced and consumed in one tree on a field + * @author vegardh + * + */ + @Documented + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + public @interface Tree { + /** The name of the tree */ + String name() default ""; + /** The annotation types that this docproc writes in this tree */ + String[] produces() default {}; + /** The annotation types that this docproc requires in this tree */ + String[] consumes() default {}; + } + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/Call.java b/docproc/src/main/java/com/yahoo/docproc/Call.java new file mode 100644 index 00000000000..6d5b25b92b5 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/Call.java @@ -0,0 +1,179 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.component.ComponentId; +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.docproc.proxy.ProxyDocument; +import com.yahoo.docproc.proxy.ProxyDocumentUpdate; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.jdisc.Metric; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.statistics.Counter; +import com.yahoo.statistics.Statistics; + +import java.util.List; + +/** + * A document processor to call - an item on a {@link com.yahoo.docproc.CallStack}. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a> + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class Call implements Cloneable { + private final DocumentProcessor processor; + private final Counter docCounter; + private final String docCounterName; + private final Counter procTimeCounter; + private final String procTimeCounterName; + private final Metric metric; + + public Call(DocumentProcessor processor) { + this(processor, Statistics.nullImplementation, new NullMetric()); + } + + /** Creates a new call to a processor with no arguments. + * @param processor the document processor to call */ + public Call(DocumentProcessor processor, Statistics manager, Metric metric) { + this(processor, "", manager, metric); + } + + public Call(DocumentProcessor processor, String chainName, Statistics manager, Metric metric) { + this.processor = processor; + if (chainName == null) { + chainName = ""; + } + chainName = chainName.replaceAll("[^\\p{Alnum}]", "_"); + docCounterName = "docprocessor_" + chainName + "_" + + getDocumentProcessorId().stringValue().replaceAll("[^\\p{Alnum}]", "_") + "_documents"; + procTimeCounterName = "docprocessor_" + chainName + "_" + + getDocumentProcessorId().stringValue().replaceAll("[^\\p{Alnum}]", "_") + "_proctime"; + docCounter = new Counter(docCounterName, manager, false); + procTimeCounter = new Counter(procTimeCounterName, manager, false, null, true); + this.metric = metric; + } + + @Override + public Object clone() { + try { + Call clone = (Call) super.clone(); + return clone; + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Will not happen"); + } + } + + /** + * Returns the processor to call. + * + * @return a reference to the processor to call + */ + public DocumentProcessor getDocumentProcessor() { + return processor; + } + + /** + * Returns the ID of the processor to call. + * + * @return the ID of the processor to call + */ + public ComponentId getDocumentProcessorId() { + return processor.getId(); + } + + /** + * The Document object the proc should work on. Normally the one in arguments, but could be a proxy object + * if schema mapping or @Accesses is in effect. + * + * <p> + * public for testing + */ + public DocumentPut configDoc(DocumentProcessor docProc, DocumentPut documentPut) { + if (!docProc.getFieldMap().isEmpty() || docProc.hasAnnotations()) { + Document document = documentPut.getDocument(); + document = new ProxyDocument(docProc, document, docProc.getDocMap(document.getDataType().getName())); + + DocumentPut newDocumentPut = new DocumentPut(document); + newDocumentPut.setCondition(documentPut.getCondition()); + documentPut = newDocumentPut; + } + + return documentPut; + } + + /** + * The DocumentUpdate object a processor should work on. The one in args, or schema mapped. + * + * @return a DocumentUpdate + */ + private DocumentUpdate configDocUpd(DocumentProcessor proc, DocumentUpdate docU) { + if (proc.getFieldMap().isEmpty()) return docU; + return new ProxyDocumentUpdate(docU, proc.getDocMap(docU.getDocumentType().getName())); + } + + private void schemaMapProcessing(Processing processing) { + final List<DocumentOperation> documentOperations = processing.getDocumentOperations(); + for (int i = 0; i < documentOperations.size(); i++) { + DocumentOperation op = documentOperations.get(i); + if (op instanceof DocumentPut) { + documentOperations.set(i, configDoc(processor, (DocumentPut) op)); + } else if (op instanceof DocumentUpdate) { + documentOperations.set(i, configDocUpd(processor, (DocumentUpdate) op)); + } + } + } + + + private void unwrapSchemaMapping(Processing processing) { + final List<DocumentOperation> documentOperations = processing.getDocumentOperations(); + + for (int i = 0; i < documentOperations.size(); i++) { + DocumentOperation documentOperation = documentOperations.get(i); + + if (documentOperation instanceof DocumentPut) { + DocumentPut putOperation = (DocumentPut) documentOperation; + + if (putOperation.getDocument() instanceof DocumentOperationWrapper) { + DocumentOperationWrapper proxy = (DocumentOperationWrapper) putOperation.getDocument(); + documentOperations.set(i, new DocumentPut(putOperation, ((DocumentPut)proxy.getWrappedDocumentOperation()).getDocument())); + } + } + } + } + + /** + * Call the DocumentProcessor of this call. + * + * @param processing the Processing object to use + * @return the progress of the DocumentProcessor that was called + */ + public DocumentProcessor.Progress call(Processing processing) { + try { + int numDocs = processing.getDocumentOperations().size(); + schemaMapProcessing(processing); + long startTime = SystemTimer.INSTANCE.milliTime(); + DocumentProcessor.Progress retval = processor.process(processing); + incrementProcTime(SystemTimer.INSTANCE.milliTime() - startTime); + incrementDocs(numDocs); + return retval; + } finally { + unwrapSchemaMapping(processing); + } + } + + public String toString() { + return "call to class " + processor.getClass().getName() + " (id: " + getDocumentProcessorId() + ")"; + } + + private void incrementDocs(long increment) { + docCounter.increment(increment); + metric.add(docCounterName, increment, null); + } + + private void incrementProcTime(long increment) { + procTimeCounter.increment(increment); + metric.add(procTimeCounterName, increment, null); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/CallStack.java b/docproc/src/main/java/com/yahoo/docproc/CallStack.java new file mode 100644 index 00000000000..8fbe72577d3 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/CallStack.java @@ -0,0 +1,402 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.component.ComponentId; +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.jdisc.Metric; +import com.yahoo.statistics.Statistics; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +/** + * <p> + * A stack of the processors to call next in this processing. To push which + * processor to call next, call addNext, to get and remove the next processor, + * call pop. + * </p> + * + * <p> + * This is not thread safe. + * </p> + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class CallStack { + + /** The name of this stack, or null if it is not named */ + private String name = null; + + /** The Call objects of this stack */ + private final List<Call> elements = new java.util.LinkedList<>(); + + /** The last element popped from the call stack, if any */ + private Call lastPopped = null; + + /** Used for creating counters in Call */ + private final Statistics statistics; + + /** Used for metrics in Call */ + private final Metric metric; + + public CallStack() { + this(null, Statistics.nullImplementation, new NullMetric()); + } + + public CallStack(String name) { + this(name, Statistics.nullImplementation, new NullMetric()); + } + + /** Creates an empty stack */ + public CallStack(Statistics statistics, Metric metric) { + this(null, statistics, metric); + } + + /** Creates an empty stack with a name */ + public CallStack(final String name, Statistics manager, Metric metric) { + this.name = name; + this.statistics = manager; + this.metric = metric; + } + + /** + * Creates a stack from another stack (starting at the next of the given + * callstack) This does a deep copy of the stack. + */ + public CallStack(final CallStack stackToCopy) { + name = stackToCopy.name; + for (final Iterator<Call> i = stackToCopy.iterator(); i.hasNext();) { + final Call callToCopy = i.next(); + elements.add((Call) callToCopy.clone()); + } + this.statistics = stackToCopy.statistics; + this.metric = stackToCopy.metric; + } + + /** + * Creates a stack (with a given name) based on a collection of document processors, which are added to the stack + * in the iteration order of the collection. + * + * @param name the name of the stack + * @param docprocs the document processors to call + */ + public CallStack(final String name, Collection<DocumentProcessor> docprocs, Statistics manager, Metric metric) { + this(name, manager, metric); + for (DocumentProcessor docproc : docprocs) { + addLast(docproc); + } + } + + /** Returns the name of this stack, or null if it is not named */ + public String getName() { + return name; + } + + /** Sets the name of this stack */ + public void setName(final String name) { + this.name = name; + } + + /** + * Push an element as the <i>next</i> element on this stack + * + * @return this for convenience + */ + public CallStack addNext(final Call call) { + elements.add(0, call); + return this; + } + + /** + * Push an element as the <i>next</i> element on this stack + * + * @return this for convenience + */ + public CallStack addNext(final DocumentProcessor processor) { + return addNext(new Call(processor, name, statistics, metric)); + } + + /** + * Push multiple elements as the <i>next</i> elements on this stack + * + * @return this for convenience + */ + public CallStack addNext(final CallStack callStack) { + elements.addAll(0, callStack.elements); + return this; + } + + /** + * Adds an element as the <i>last</i> element on this stack + * + * @return this for convenience + */ + public CallStack addLast(final Call call) { + elements.add(call); + return this; + } + + /** + * Adds an element as the <i>last</i> element on this stack + * + * @return this for convenience + */ + public CallStack addLast(final DocumentProcessor processor) { + return addLast(new Call(processor, name, statistics, metric)); + } + + /** + * Adds multiple elements as the <i>last</i> elements on this stack + * + * @return this for convenience + */ + public CallStack addLast(final CallStack callStack) { + elements.addAll(callStack.elements); + return this; + } + + /** + * Adds an element just before the first occurence of some other element on + * the stack. This can not be called during an iteration. + * + * @param before + * the call to add this before. If this call is not present (the + * same object instance), new processor is added as the last + * element + * @param call the call to add + * @return this for convenience + */ + public CallStack addBefore(final Call before, final Call call) { + final int insertPosition = elements.indexOf(before); + if (insertPosition < 0) { + addLast(call); + } else { + elements.add(insertPosition, call); + } + return this; + } + + /** + * Adds an element just before the first occurence of some element on the + * stack. This can not be called during an iteration. + * + * @param before + * the call to add this before. If this call is not present (the + * same object instance), the new processor is added as the last + * element + * @param processor the processor to add + * @return this for convenience + */ + public CallStack addBefore(final Call before, DocumentProcessor processor) { + return addBefore(before, new Call(processor, name, statistics, metric)); + } + + /** + * Adds multiple elements just before the first occurence of some element on + * the stack. This can not be called during an iteration. + * + * @param before + * the call to add this before. If this call is not present (the + * same object instance), the new processor is added as the last + * element + * @param callStack + * the calls to add + * @return this for convenience + */ + public CallStack addBefore(final Call before, final CallStack callStack) { + final int insertPosition = elements.indexOf(before); + if (insertPosition < 0) { + addLast(callStack); + } else { + elements.addAll(insertPosition, callStack.elements); + } + return this; + } + + /** + * Adds an element just after the first occurence of some other element on + * the stack. This can not be called during an iteration. + * + * @param after + * the call to add this before. If this call is not present, (the + * same object instance), the new processor is added as the last + * element + * @param call + * the call to add + * @return this for convenience + */ + public CallStack addAfter(final Call after, final Call call) { + final int insertPosition = elements.indexOf(after); + if (insertPosition < 0) { + addLast(call); + } else { + elements.add(insertPosition + 1, call); + } + return this; + } + + /** + * Adds an element just after the first occurence of some other element on + * the stack. This can not be called during an iteration. + * + * @param after + * the call to add this after. If this call is not present, (the + * same object instance), the new processor is added as the last + * element + * @param processor + * the processor to add + * @return this for convenience + */ + public CallStack addAfter(final Call after, final DocumentProcessor processor) { + return addAfter(after, new Call(processor, name, statistics, metric)); + } + + /** + * Adds multiple elements just after another given element on the stack. + * This can not be called during an iteration. + * + * @param after + * the call to add this before. If this call is not present, (the + * same object instance), the new processor is added as the last + * element + * @param callStack + * the calls to add + * @return this for convenience + */ + public CallStack addAfter(final Call after, final CallStack callStack) { + final int insertPosition = elements.indexOf(after); + if (insertPosition < 0) { + addLast(callStack); + } else { + elements.addAll(insertPosition + 1, callStack.elements); + } + return this; + } + + /** + * Removes the given call. Does nothing if the call is not present. + * + * @param call + * the call to remove + * @return this for convenience + */ + public CallStack remove(final Call call) { + for (final ListIterator<Call> i = iterator(); i.hasNext();) { + final Call current = i.next(); + if (current == call) { + i.remove(); + } + } + return this; + } + + /** + * Returns whether this stack has this call (left) + * + * @param call + * the call to check + * @return true if the call is present, false otherwise + */ + public boolean contains(final Call call) { + for (final ListIterator<Call> i = iterator(); i.hasNext();) { + final Call current = i.next(); + if (current == call) { + return true; + } + } + return false; + } + + /** + * Returns the next call to this processor id, or null if no such calls are + * left + */ + public Call findCall(final ComponentId processorId) { + for (final Iterator<Call> i = iterator(); i.hasNext();) { + final Call call = i.next(); + if (call.getDocumentProcessorId().equals(processorId)) { + return call; + } + } + return null; + } + + /** + * Returns the next call to this processor, or null if no such calls are + * left + */ + public Call findCall(final DocumentProcessor processor) { + return findCall(processor.getId()); + } + + /** + * Returns and removes the next element, or null if there are no more elements + */ + public Call pop() { + if (elements.isEmpty()) return null; + lastPopped = elements.remove(0); + return lastPopped; + } + + /** + * Returns the next element without removing it, or null if there are no + * more elements + */ + public Call peek() { + if (elements.isEmpty()) return null; + return elements.get(0); + } + + /** + * Returns the element that was last popped from this stack, or null if none + * have been popped or the stack is empty + */ + public Call getLastPopped() { + return lastPopped; + } + + public void clear() { + elements.clear(); + } + + /** + * Returns a modifiable ListIterator over all the remaining elements of this + * stack, starting by the next element + */ + public ListIterator<Call> iterator() { + return elements.listIterator(); + } + + /** Returns the number of remainnig elements in this stack */ + public int size() { + return elements.size(); + } + + @Override + public String toString() { + final StringBuffer buffer = new StringBuffer(); + buffer.append("callstack"); + if (name != null) { + buffer.append(" "); + buffer.append(name); + } + buffer.append(":"); + for (final Iterator<Call> i = iterator(); i.hasNext();) { + buffer.append("\n"); + buffer.append(" "); + buffer.append(i.next().toString()); + } + buffer.append("\n"); + return buffer.toString(); + } + + public Statistics getStatistics() { + return statistics; + } + + public Metric getMetric() { + return metric; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java b/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java new file mode 100644 index 00000000000..225f62ed2ef --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java @@ -0,0 +1,185 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.jdisc.Metric; +import com.yahoo.log.LogLevel; +import com.yahoo.statistics.Counter; +import com.yahoo.text.Utf8; + +import java.util.Collections; +import java.util.logging.Logger; + +/** + * An executor executed incoming processings on its CallStack + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocprocExecutor { + + private final static String METRIC_NAME_DOCUMENTS_PROCESSED = "documents_processed"; + + private static final Logger log = Logger.getLogger(DocprocExecutor.class.getName()); + + private final String name; + private final String docCounterName; + private final Counter docCounter; + private final Metric metric; + private Metric.Context context; + private final CallStack callStack; + + /** + * Creates a new named DocprocExecutor with the given CallStack. + * + * @param name the name of this executor + * @param callStack the chain of document processors this executor shall execute on processings + */ + public DocprocExecutor(String name, CallStack callStack) { + this.name = name; + String chainDimension = name; + if (name != null) { + chainDimension = name.replaceAll("[^\\p{Alnum}]", "_"); + docCounterName = "chain_" + name.replaceAll("[^\\p{Alnum}]", "_") + "_documents"; + } else { + //name is null + docCounterName = "chain_" + name + "_documents"; + } + docCounter = new Counter(docCounterName, callStack.getStatistics(), false); + this.metric = callStack.getMetric(); + this.callStack = callStack; + this.callStack.setName(name); + this.context = this.metric.createContext(Collections.singletonMap("chain", chainDimension)); + } + + /** + * Creates a new named DocprocExecutor, with the same instance variables as the given executor, + * but a new call stack. + * + * @param oldExecutor the executor to inherit the instance variables from, sans call stack. + * @param callStack the call stack to use. + */ + public DocprocExecutor(DocprocExecutor oldExecutor, CallStack callStack) { + this.name = oldExecutor.name; + this.docCounterName = oldExecutor.docCounterName; + this.docCounter = oldExecutor.docCounter; + this.metric = oldExecutor.metric; + this.context = oldExecutor.context; + this.callStack = callStack; + } + + public CallStack getCallStack() { + return callStack; + } + + public String getName() { + return name; + } + + private void incrementNumDocsProcessed(int num) { + docCounter.increment(num); + metric.add(docCounterName, num, null); + metric.add(METRIC_NAME_DOCUMENTS_PROCESSED,num,this.context); + } + + private void incrementNumDocsProcessed(Processing processing) { + int increment = processing.getNumDocsToBeProcessed(); + if (increment != 0) { + incrementNumDocsProcessed(increment); + } + } + + /** + * Processes a given Processing through the CallStack of this executor. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; if this is LATER, the Processing is not done and must be reprocessed later. + * @throws RuntimeException if a document processor throws an exception during processing. + * @see com.yahoo.docproc.Processing + */ + public DocumentProcessor.Progress process(Processing processing) { + processing.setServiceName(getName()); + if (processing.callStack() == null) { + processing.setCallStack(new CallStack(getCallStack())); + } + + DocumentProcessor.Progress progress = DocumentProcessor.Progress.DONE; + //metrics stuff: + //TODO: Note that this is *wrong* in case of Progress.LATER, documents are then counted several times until the Processing is DONE or FAILED. + incrementNumDocsProcessed(processing); + do { + Call call = processing.callStack().pop(); + if (call == null) { + // No more processors - done + return progress; + } + + progress = DocumentProcessor.Progress.DONE; + //might throw exception, which is OK: + progress = call.call(processing); + + if (log.isLoggable(LogLevel.SPAM)) { + logProgress(processing, progress, call); + } + + if (DocumentProcessor.Progress.LATER.equals(progress)) { + processing.callStack().addNext(call); + return progress; + } + } while (DocumentProcessor.Progress.DONE.equals(progress)); + return progress; + } + + private void logProgress(Processing processing, DocumentProcessor.Progress progress, Call call) { + StringBuilder message = new StringBuilder(); + boolean first = true; + message.append(call.getDocumentProcessorId()).append(" of class ") + .append(call.getDocumentProcessor().getClass().getSimpleName()).append(" returned ").append(progress) + .append(" for the documents: ["); + for (DocumentOperation op : processing.getDocumentOperations()) { + if (first) { + first = false; + } else { + message.append(", "); + } + if (op instanceof DocumentPut) { + message.append(Utf8.toString(JsonWriter.toByteArray(((DocumentPut) op).getDocument()))); + } else { + message.append(op.toString()); + } + } + message.append("]"); + log.log(LogLevel.SPAM, message.toString()); + } + + /** + * Processes a given Processing through the CallStack of this executor. Note that if a DocumentProcessor + * returns a LaterProgress for this processing, it will be re-processed (after waiting the specified delay given + * by the LaterProgress), until done or failed. + * + * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null. + * @return a Progress; this is never a LaterProgress. + * @throws RuntimeException if a document processor throws an exception during processing, or this thread is interrupted while waiting. + * @see com.yahoo.docproc.Processing + * @see com.yahoo.docproc.DocumentProcessor.Progress + * @see com.yahoo.docproc.DocumentProcessor.LaterProgress + */ + public DocumentProcessor.Progress processUntilDone(Processing processing) { + DocumentProcessor.Progress progress; + while (true) { + progress = process(processing); + if (!(progress instanceof DocumentProcessor.LaterProgress)) { + break; + } + DocumentProcessor.LaterProgress later = (DocumentProcessor.LaterProgress) progress; + try { + Thread.sleep(later.getDelay()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return progress; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java new file mode 100644 index 00000000000..3d113fecfc5 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java @@ -0,0 +1,373 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.component.AbstractComponent; +import com.yahoo.component.ComponentId; +import com.yahoo.docproc.proxy.SchemaMap; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentTypeManager; + +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * <p>The document processing service. + * Use this to set up a document processing chain and to + * process documents using that chain. Note that there may + * be multiple named instances of this service in the same + * runtime. The default service is called "default" and is always present.</p> + * + * <p>To create a server which receives documents from the network + * and processes them, have a look at com.yahoo.docproc.server.Server.</p> + * + * <p>This class is thread safe.</p> + * + * @author bratseth + */ +public class DocprocService extends AbstractComponent { + + private static Logger log = Logger.getLogger(DocprocService.class.getName()); + private volatile DocprocExecutor executor; + /** + * The processings currently in progress at this service + */ + private final LinkedBlockingQueue<Processing> queue; + /** The current state of this service */ + private boolean inService = false; + /** The current state of this service */ + private boolean acceptingNewProcessings = true; + public static SchemaMap schemaMap = new SchemaMap(null); + private DocumentTypeManager documentTypeManager = null; + + public DocprocService(ComponentId id) { + super(id); + queue = new LinkedBlockingQueue<>(); + } + + /** + * Creates a new docproc service, which is set to be in service. + * + * @param id the component id of the new service. + * @param stack the call stack to use. + * @param mgr the document type manager to use. + */ + public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) { + this(id); + setCallStack(stack); + setDocumentTypeManager(mgr); + setInService(true); + } + + /** + * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string, + * it will become the name "default". + */ + public DocprocService(String name) { + this(new ComponentId(name, null)); + } + + public DocumentTypeManager getDocumentTypeManager() { + return documentTypeManager; + } + + public void setDocumentTypeManager(DocumentTypeManager documentTypeManager) { + this.documentTypeManager = documentTypeManager; + } + + public int getQueueSize() { + return queue.size(); + } + + /** + * Returns the DocprocExecutor of this DocprocService. This can be used to + * synchronously process one Processing. + * + * @return the DocprocExecutor of this DocprocService, or null if a CallStack has not yet been set. + */ + public DocprocExecutor getExecutor() { + return executor; + } + + private void setExecutor(DocprocExecutor executor) { + this.executor = executor; + } + + /** + * Sets whether this should currently perform any processing. + * New processings will be accepted also when this is out of service, + * but no processing will happen when it is out of service. + */ + public void setInService(boolean inService) { + this.inService = inService; + } + + /** + * Returns true if this is currently processing incoming processings + * (in service), or false if they are just queued up (out of service). + * By default, this is out of service. + */ + public boolean isInService() { + return inService; + } + + /** + * Returns true if this service currently accepts new incoming processings via process(...). Default is true. + * + * @return true if accepting new incoming processings + */ + public boolean isAcceptingNewProcessings() { + return acceptingNewProcessings; + } + + /** + * Sets whether this service should accept new incoming processings via process(...). + * + * @param acceptingNewProcessings true if service should accept new incoming processings + */ + public void setAcceptingNewProcessings(boolean acceptingNewProcessings) { + this.acceptingNewProcessings = acceptingNewProcessings; + } + + public String getName() { + return getId().stringValue(); + } + + /** + * Returns the processing chain of this service. This stack can not be modified. + * To change the stack, set a new one. + * TODO: Enforce unmodifiability + */ + public CallStack getCallStack() { + DocprocExecutor ex = getExecutor(); + return (ex == null) ? null : ex.getCallStack(); + } + + /** + * Sets a new processing stack for this service. This will be the Prototype + * for the call stacks of individual processings in this service + */ + public void setCallStack(CallStack stack) { + DocprocExecutor ex = ((getExecutor() == null) ? new DocprocExecutor(getName(), stack) : new DocprocExecutor(getExecutor(), stack)); + setExecutor(ex); + } + + /** + * Asynchronously process the given Processing using the processing + * chain of this service, and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(Processing processing, ProcessingEndpoint endp) { + processing.setServiceName(getName()); + processing.setCallStack(new CallStack(getCallStack())); + processing.setEndpoint(endp); + addProcessing(processing); + } + + /** + * Asynchronously process the given Processing using the processing + * chain of this service + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(Processing processing) { + process(processing, null); + } + + /** + * Asynchronously process the given document put or document update using the processing + * chain of this service, and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(DocumentOperation documentOperation, ProcessingEndpoint endp) { + addProcessing(new Processing(getName(), documentOperation, new CallStack(getCallStack()), endp)); + } + + /** + * Asynchronously process the given document operation using the processing + * chain of this service. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void process(DocumentOperation documentOperation) { + process(documentOperation, null); + } + + /** + * Asynchronously process the given document operations as one unit + * using the processing chain of this service, + * and call the specified ProcessingEndpoint when done. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void processDocumentOperations(List<DocumentOperation> documentOperations, ProcessingEndpoint endp) { + addProcessing(Processing.createProcessingFromDocumentOperations(getName(), documentOperations, new CallStack(getCallStack()), endp)); + } + + /** + * Asynchronously process the given document operations as one unit + * using the processing chain of this service. + * + * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full + * @throws IllegalStateException if this DocprocService is not accepting new incoming processings + */ + public void processDocumentOperations(List<DocumentOperation> documentOperations) { + processDocumentOperations(documentOperations, null); + } + + private void addProcessing(Processing processing) { + if (!isAcceptingNewProcessings()) { + throw new IllegalStateException("Docproc service " + getName() + " is not accepting new incoming processings. Cannot add " + processing + " "); + } + + if (!queue.offer(processing)) { + throw new RejectedExecutionException("Docproc service " + getName() + " is busy, please try later"); + } + } + + /** + * <p>Do some work in this service. This will perform some processing and return + * in a "short" while, as long as individual processors also returns.</p> + * + * <p>This method is thread safe - multiple threads may call doWork at any time. + * Note that processors + * should be non-blocking, so multiple threads should be used primarily to + * utilize multiple processors.</p> + * + * @return true if some work was performed, false if no work could be performed + * at this time, because there are no outstanding processings, or because + * this is out of service. Note that since processings may arrive or be put + * back by another thread at any time, this return value does not mean + * that no work will be available if doWork as called again immediately. + */ + public boolean doWork() { + try { + return doWork(false); + } catch (InterruptedException e) { + //will never happen because we are not blocking + throw new RuntimeException(e); + } + } + + private boolean doWork(boolean blocking) throws InterruptedException { + Processing processing; + if (blocking) { + processing = queue.take(); + } else { + processing = queue.poll(); + } + + if (processing == null) { + //did no work, returning nothing to queue + return false; + } + if (!isInService()) { + //did no work, returning processing (because it's not empty) + queue.add(processing); //always successful, since queue is unbounded + return false; + } + + boolean remove = workOn(processing); //NOTE: Exceptions are handled in here, but not Errors + if (!remove) { + queue.add(processing); //always successful, since queue is unbounded + } + return true; + //NOTE: We *could* have called returnProcessing() in a finally block here, but we don't + //want that, since the only thing being thrown out here is Errors, and then the Processing + //can just disappear instead + } + + /** + * <p>Do some work in this service. This will perform some processing and return + * in a "short" while, as long as individual processors also returns. Note that + * if the internal queue is empty when this method is called, it will block until + * some work is submitted by a call to process() by another thread.</p> + * + * <p>This method is thread safe - multiple threads may call doWorkBlocking at any time. + * Note that processors + * should be non-blocking, so multiple threads should be used primarily to + * utilize multiple processors.</p> + * + * @return always true, since if the internal queue is empty when this method is + * called, it will block until some work is submitted by a call to + * process() by another thread. + * @throws InterruptedException if a call to this method is interrupted while waiting for data to become available + */ + public boolean doWorkBlocking() throws InterruptedException { + return doWork(true); + } + + /** + * Do some work on this processing. Must only be called from the worker thread. + * + * @return true if this processing should be removed, false if there is more work to do on it later + * @throws NoCallStackException if no CallStack has been set on this executor. + */ + boolean workOn(Processing processing) { + DocprocExecutor ex = getExecutor(); + if (ex == null) { + throw new NoCallStackException(); + } + + DocumentProcessor.Progress progress; + + try { + progress = ex.process(processing); + } catch (Exception e) { + processingFailed(processing, processing + " failed", e); + return true; + } + + if (DocumentProcessor.Progress.DONE.equals(progress)) { + //notify endpoint + ProcessingEndpoint recv = processing.getEndpoint(); + if (recv != null) { + recv.processingDone(processing); + } + return true; + } else if (DocumentProcessor.Progress.FAILED.equals(progress)) { + processingFailed(processing, processing + " failed at " + processing.callStack().getLastPopped(), null); + return true; + } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) { + processingFailed(processing, + processing + " failed PERMANENTLY at " + processing.callStack().getLastPopped() + ", disabling processing service.", null); + setInService(false); + return true; + } else { + //LATER: + return false; + } + } + + private void processingFailed(Processing processing, String errorMsg, Exception e) { + if (e != null) { + if (e instanceof HandledProcessingException) { + errorMsg += ". Error message: " + e.getMessage(); + log.log(Level.WARNING, errorMsg); + log.log(Level.FINE, "Chained exception:", e); + } else { + log.log(Level.WARNING, errorMsg, e); + } + } else { + log.log(Level.WARNING, errorMsg); + } + + //notify endpoint + ProcessingEndpoint recv = processing.getEndpoint(); + if (recv != null) { + recv.processingFailed(processing, e); + } + } + + private class NoCallStackException extends RuntimeException { + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java b/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java new file mode 100644 index 00000000000..560022dca1b --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java @@ -0,0 +1,11 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DocumentOperation; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public interface DocumentOperationWrapper { + public DocumentOperation getWrappedDocumentOperation(); +} diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java new file mode 100644 index 00000000000..1ad745ac1b7 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java @@ -0,0 +1,183 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.collections.Pair; +import com.yahoo.component.ComponentId; +import com.yahoo.component.Version; +import com.yahoo.component.chain.ChainedComponent; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.logging.Logger; + +/** + * <p>A document processor is a component which performs some + * operation on a document or document update. Document processors are asynchronous, + * they may request some data and then return. The processing framework + * is responsible for calling processors again at unspecified times + * until they are done processing the document or document update.</p> + * + * <p>Document processor instances + * are chained together by the framework to realize a complete processing pipeline. + * The processing chain is represented by the processor instances themselves, see + * getNext/setNext. Document processors may optionally control the routing + * through the chain by setting the next processor on ongoing processings.</p> + * + * <p>A processing may contain one or multiple documents or document updates. Document processors + * may optionally handle collections of processors in some other way than just + * processing each one in order.</p> + * + * <p>A document processor <i>must</i> have an empty constructor. When instantiated + * from Vespa config (as opposed to being instantiated programmatically in a stand-alone + * Docproc system), the framework is responsible for configuring the processor using + * setConfig(). If a document processor wants to do some initial setup after configuration + * has been set, but before it has begun processing documents or document updates, it should + * override initialize(). </p> + * + * <p>Document processors must be thread safe. To ensure this, make sure that + * access to any mutable, thread-unsafe state held in a field by the processor is + * synchronized.</p> + * + * @author <a href="bratseth@yahoo-inc.com">Jon S Bratseth</a> + */ +public abstract class DocumentProcessor extends ChainedComponent { + static Logger log = Logger.getLogger(DocprocService.class.getName()); + + /** The number of instances of this processor */ + private static int instanceCounter = 1; + + /** Schema map for doctype-fieldnames */ + private Map<Pair<String,String>, String> fieldMap = new HashMap<>(); + + /** For a doc type, the actual field name mapping to do */ + // TODO how to flush this when reconfig of schemamapping? Must solve + private Map<String, Map<String, String>> docMapCache = new HashMap<>(); + + final boolean hasAnnotations; + + public DocumentProcessor() { + hasAnnotations = getClass().getAnnotation(Accesses.class) != null; + } + + final boolean hasAnnotations() { + return hasAnnotations; + } + + /** + * Processes a processing, which can contain zero or more document bases. The implementing document processor + * is free to modify, replace or delete elements in the list inside processing. + * + * @param processing the processing to process + * @return the outcome of this processing + */ + public abstract Progress process(Processing processing); + + public String toString() { + return "processor " + getId().stringValue(); + } + + /** An enumeration of possible results of calling a process method */ + public static class Progress { + + /** Returned by a processor when it is done with a processing */ + public static final Progress DONE = new Progress("done"); + + /** + * Returned by a processor when it should be called again later + * for the same processing + */ + public static final Progress LATER = new LaterProgress(); + + /** + * Returned by a processor when a processing has failed + * and it should not be called again for this processing. + */ + public static final Progress FAILED = new Progress("failed"); + + /** + * Returned by a processor when processing has permanently failed, + * so that the document processing service should disable itself until + * reconfigured or restarted. + */ + public static final Progress PERMANENT_FAILURE = new Progress("permanent_failure"); + + private String name; + + protected Progress(String name) { + this.name = name; + } + + public static Progress later(long delay) { + return new LaterProgress(delay); + } + + public String toString() { + return name; + } + + public boolean equals(Object object) { + return object instanceof Progress && ((Progress) object).name.equals(this.name); + } + + public int hashCode() { + return name.hashCode(); + } + } + + public static final class LaterProgress extends Progress { + private final long delay; + public static final long DEFAULT_LATER_DELAY = 20; //ms + + private LaterProgress() { + this(DEFAULT_LATER_DELAY); + } + + private LaterProgress(long delay) { + super("later"); + this.delay = delay; + } + + public long getDelay() { + return delay; + } + } + + /** Sets the schema map for field names */ + public void setFieldMap(Map<Pair<String, String>, String> fieldMap) { + this.fieldMap = fieldMap; + + } + + /** Schema map for field names + * (doctype,from)→to + * + */ + public Map<Pair<String, String>, String> getFieldMap() { + return fieldMap; + } + + public Map<String, String> getDocMap(String docType) { + Map<String, String> cached = docMapCache.get(docType); + if (cached!=null) { + return cached; + } + Map<String, String> ret = new HashMap<>(); + for (Entry<Pair<String, String>, String> e : fieldMap.entrySet()) { + // Remember to include tuple if doctype is unset in mapping + if (docType.equals(e.getKey().getFirst()) || e.getKey().getFirst()==null || "".equals(e.getKey().getFirst())) { + ret.put(e.getKey().getSecond(), e.getValue()); + } + } + docMapCache.put(docType, ret); + return ret; + } + + private void dumpMap(Map<?, String> m) { + System.out.print("["); + for (Map.Entry<?, String> e : m.entrySet()) { + System.out.print("("+e.getKey()+"->"+e.getValue()+")"); + } + System.out.print("]\n"); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java b/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java new file mode 100644 index 00000000000..33636cf17ab --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +/** + * Exception generated by known bad input in a docproc. Will cause only message to be logged, + * not stacktrace. + * + * @author <a href="mailto:mathiasm@yahoo-inc.com">Mathias M\u00F8lster Lidal</a> + */ +public class HandledProcessingException extends RuntimeException { + + public HandledProcessingException(String message) { + super(message); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/Processing.java b/docproc/src/main/java/com/yahoo/docproc/Processing.java new file mode 100644 index 00000000000..4f18a8372d8 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/Processing.java @@ -0,0 +1,286 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.document.DocumentOperation; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * A document processing. This contains the document(s) or document update(s) to process, + * a map of processing context data and the processing instance to + * invoke the next time any work needs to be done on this processing. + * + * @author bratseth + */ +public class Processing { + /** + * The name of the service which owns this processing. + * Null is the same as "default" + */ + private String service = null; + + /** The processors to call the next work is done on this processing */ + private CallStack callStack = null; + + /** The collection of documents or document updates processed by this. This is never null */ + private List<DocumentOperation> documentOperations; + + /** + * Documents or document updates which should be added to <code>documents</code> before + * the next access, or null if documents or document updates have never been added to + * this processing + */ + private List<DocumentOperation> documentsToAdd = null; + + /** The processing context variables */ + private Map<String, Object> context = null; + + /** The endpoint of this processing. */ + private ProcessingEndpoint endpoint = null; + + /** The registry of docproc services. */ + ComponentRegistry<DocprocService> docprocServiceRegistry = null; + private boolean getNumDocsCalled = false; + + /** + * Create a Processing with no documents. Useful with DocprocService.process(Processing). + * Note that the callstack is initially empty when using this constructor (but it is + * set by DocprocService.process(Processing).) + */ + public Processing() { + this.documentOperations = new ArrayList<>(1); + } + + /** + * Create a Processing from the given document operation + */ + public static Processing of(DocumentOperation documentOperation) { + return new Processing(documentOperation); + } + + /** + * Create a Processing from the given document operation + * @deprecated Use {@link #of(DocumentOperation)} instead + */ + @Deprecated + @SuppressWarnings("unused") + public static Processing fromDocumentOperation(DocumentOperation documentOperation) { + return Processing.of(documentOperation); + } + + private Processing(DocumentOperation documentOperation) { + this(); + addDocumentOperation(documentOperation); + } + + /** + * Create a processing with one document. The given document put or document update will be the single + * element in <code>documentOperations</code>. + * + * @param service the unique name of the service processing this + * @param documentOperation document operation (DocumentPut or DocumentUpdate) + * @param callStack the document processors to call in this processing. + * @param endp the endpoint of this processing + */ + Processing(String service, DocumentOperation documentOperation, CallStack callStack, ProcessingEndpoint endp) { + this.service = service; + this.documentOperations = new ArrayList<>(1); + documentOperations.add(documentOperation); + this.callStack = callStack; + this.endpoint = endp; + } + + /** + * Create a processing with one document. The given document put or document update will be the single + * element in <code>documentOperations</code>. + * + * @param service the unique name of the service processing this + * @param documentOperation document operation (DocumentPut or DocumentUpdate) + * @param callStack the document processors to call in this processing. + * This <b>tranfers ownership</b> of this structure + * to this class. The caller <i>must not</i> modify it + */ + public Processing(String service, DocumentOperation documentOperation, CallStack callStack) { + this(service, documentOperation, callStack, null); + } + + @SuppressWarnings("unused") + private Processing(String service, List<DocumentOperation> documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp, boolean unused) { + this.service = service; + this.documentOperations = new ArrayList<>(documentOpsAndUpdates.size()); + documentOperations.addAll(documentOpsAndUpdates); + this.callStack = callStack; + this.endpoint = endp; + } + + static Processing createProcessingFromDocumentOperations(String service, List<DocumentOperation> documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp) { + return new Processing(service, documentOpsAndUpdates, callStack, endp, false); + } + + /** + * + * @param service the unique name of the service processing this + * @param documentsAndUpdates the document operation list. This <b>transfers ownership</b> of this list + * to this class. The caller <i>must not</i> modify it + * @param callStack the document processors to call in this processing. + * This <b>transfers ownership</b> of this structure + * to this class. The caller <i>must not</i> modify it + */ + public static Processing createProcessingFromDocumentOperations(String service, List<DocumentOperation> documentsAndUpdates, CallStack callStack) { + return new Processing(service, documentsAndUpdates, callStack, null, false); + } + + public ComponentRegistry<DocprocService> getDocprocServiceRegistry() { + return docprocServiceRegistry; + } + + public void setDocprocServiceRegistry(ComponentRegistry<DocprocService> docprocServiceRegistry) { + this.docprocServiceRegistry = docprocServiceRegistry; + } + + /** Returns the name of the service processing this. This will never return null */ + public String getServiceName() { + if (service == null) return "default"; + return service; + } + + /** Sets the name of the service processing this. */ + public void setServiceName(String service) { + this.service = service; + } + + /** + * Convenience method for looking up and returning the service processing this. This might return null + * if #getServiceName returns a name that is not registered in {@link com.yahoo.docproc.DocprocService}. + * + * @return the service processing this, or null if unknown. + */ + public DocprocService getService() { + if (docprocServiceRegistry != null) { + return docprocServiceRegistry.getComponent(getServiceName()); + } + return null; + } + + /** Returns a context variable, or null if it is not set */ + public Object getVariable(String name) { + if (context == null) return null; + return context.get(name); + } + + /** + * Returns an iterator of all context variables that are set + * + * @return an iterator over objects of type Map.Entry + */ + public Iterator<Map.Entry<String, Object>> getVariableAndNameIterator() { + if (context == null) context = new HashMap<>(); + return context.entrySet().iterator(); + } + + /** Clears all context variables that have been set */ + public void clearVariables() { + if (context == null) return; + context.clear(); + } + + /** Sets a context variable. */ + public void setVariable(String name, Object value) { + if (context == null) context = new java.util.HashMap<>(); + context.put(name, value); + } + + public Object removeVariable(String name) { + if (context == null) return null; + return context.remove(name); + } + + /** Returns true if this variable is present, even if it is null */ + public boolean hasVariable(String name) { + return context != null && context.containsKey(name); + } + + /** + * Returns the ProcessingEndpoint that is called when this Processing is complete, if any. + * + * @return the ProcessingEndpoint, or null + */ + ProcessingEndpoint getEndpoint() { + return endpoint; + } + + /** + * Sets the ProcessingEndpoint to be called when this Processing is complete. + * + * @param endpoint the ProcessingEndpoint to use + */ + void setEndpoint(ProcessingEndpoint endpoint) { + this.endpoint = endpoint; + } + + public void addDocumentOperation(DocumentOperation documentOperation) { + if (documentsToAdd == null) documentsToAdd = new ArrayList<>(1); + documentsToAdd.add(documentOperation); + } + + private void updateDocumentOperations() { + if (documentsToAdd != null) { + documentOperations.addAll(documentsToAdd); + documentsToAdd.clear(); + } + } + + public List<DocumentOperation> getDocumentOperations() { + updateDocumentOperations(); + return documentOperations; + } + + /** Returns the processors to call in this processing */ + public CallStack callStack() { + return callStack; + } + + /** + * Package-private method to set the callstack of this processing. Only to be used + * by DocprocService.process(Processing). + * + * @param callStack the callstack to set + */ + void setCallStack(CallStack callStack) { + this.callStack = callStack; + } + + public String toString() { + String previousCall = ""; + if (callStack != null) { + Call call = callStack.getLastPopped(); + if (call != null) { + previousCall = "Last call: " + call; + } + } + if (documentOperations.size() == 1) { + return "Processing of " + documentOperations.get(0) + ". " + previousCall; + } else { + String listString = documentOperations.toString(); + if (listString.length() > 100) { + listString = listString.substring(0, 99); + listString += "...]"; + } + + return "Processing of " + listString + ". " + previousCall; + } + } + + int getNumDocsToBeProcessed() { + if (getNumDocsCalled) { + return 0; + } + getNumDocsCalled = true; + return getDocumentOperations().size(); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java b/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java new file mode 100644 index 00000000000..2159147a476 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java @@ -0,0 +1,13 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public interface ProcessingEndpoint { + + public void processingDone(Processing processing); + + public void processingFailed(Processing processing, Exception exception); + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java new file mode 100644 index 00000000000..16a207171cb --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java @@ -0,0 +1,120 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.Document; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.log.LogLevel; + +/** + * <p>Simple layer on top of {@link DocumentProcessor}, in order to make docproc + * development more user friendly and to the point.</p> + * + * <p>This simply iterates over the {@link DocumentOperation}s in {@link Processing#getDocumentOperations}, and calls + * the appropriate process() method given by this class.</p> + * + * <p>Note that more sophisticated use cases should subclass {@link DocumentProcessor} instead. Specifically, + * it is not possible to return a {@link DocumentProcessor.LaterProgress} from any of the process() methods that SimpleDocumentProcessor + * provides - since their return type is void.</p> + * + * <p>SimpleDocumentProcessor is for the <em>simple</em> cases. For complete control over document processing, + * like returning instances of {@link DocumentProcessor.LaterProgress}, subclass {@link DocumentProcessor} instead.</p> + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a> + */ +public class SimpleDocumentProcessor extends DocumentProcessor { + /** + * Override this to process the Document inside a DocumentPut. + * @deprecated use process(DocumentPut) + * + * @param document the Document to process. + */ + public void process(Document document) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Ignored " + document); + } + } + + /** + * Override this to process DocumentPuts. If this method is not overridden, the implementation in this class + * will ignore DocumentPuts (passing them through un-processed). If processing of this DocumentPut fails, the + * implementation must throw a {@link RuntimeException}. + * + * @param put the DocumentPut to process. + */ + public void process(DocumentPut put) { + process(put.getDocument()); + } + + /** + * Override this to process DocumentUpdates. If this method is not overridden, the implementation in this class + * will ignore DocumentUpdates (passing them through un-processed). If processing of this DocumentUpdate fails, the + * implementation must throw a {@link RuntimeException}. + * + * @param update the DocumentUpdate to process. + */ + public void process(DocumentUpdate update) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Ignored " + update); + } + } + + /** + * Override this to process DocumentRemoves. If this method is not overridden, the implementation in this class + * will ignore DocumentRemoves (passing them through un-processed). If processing of this DocumentRemove fails, the + * implementation must throw a {@link RuntimeException}. + * + * @param remove the DocumentRemove to process. + */ + public void process(DocumentRemove remove) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Ignored " + remove); + } + } + + /** + * Simple process() that follows the official guidelines for + * looping over {@link DocumentOperation}s, and then calls the appropriate, + * overloaded process() depending on the type of base. + * <p> + * Declared as final, so if you want to handle everything yourself + * you should of course extend DocumentProcessor instead of + * SimpleDocumentProcessor and just go about as usual. + * <p> + * It is important to note that when iterating over the {@link DocumentOperation}s in + * {@link com.yahoo.docproc.Processing#getDocumentOperations()}, an exception thrown + * from any of the process() methods provided by this class will be thrown straight + * out of this here. This means that failing one document will fail the + * entire batch. + * + * @param processing the Processing to process. + * @return Progress.DONE, unless a subclass decides to throw an exception + */ + @Override + public final Progress process(Processing processing) { + final int initialSize = processing.getDocumentOperations().size(); + for (DocumentOperation op : processing.getDocumentOperations()) { + try { + if (op instanceof DocumentPut) { + process((DocumentPut) op); + } else if (op instanceof DocumentUpdate) { + process((DocumentUpdate) op); + } else if (op instanceof DocumentRemove) { + process((DocumentRemove) op); + } + } catch (RuntimeException e) { + if (log.isLoggable(LogLevel.DEBUG) && initialSize != 1) { + log.log(LogLevel.DEBUG, + "Processing of document failed, from processing.getDocumentOperations() containing " + + initialSize + " DocumentOperation(s).", e); + } + throw e; + } + } + + return Progress.DONE; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java b/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java new file mode 100644 index 00000000000..a0e49bb4cb2 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java @@ -0,0 +1,14 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +/** + * Exception to be thrown by a document processor on transient failures. Caller + * is welcome to try the call again later. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class TransientFailureException extends RuntimeException { + public TransientFailureException(String s) { + super(s); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore b/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore diff --git a/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore b/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java new file mode 100644 index 00000000000..134896e3b0c --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java @@ -0,0 +1,58 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.document.DocumentUtil; +import com.yahoo.jdisc.Metric; +import com.yahoo.log.LogLevel; +import com.yahoo.statistics.*; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class DocprocThreadManager { + private static Logger log = Logger.getLogger(DocprocThreadManager.class.getName()); + + private final long maxConcurrentByteSize; + private final AtomicLong bytesStarted = new AtomicLong(0); + private final AtomicLong bytesFinished = new AtomicLong(0); + + DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) { + this(maxConcurrentFactor, documentExpansionFactor, containerCoreMemoryMb, Statistics.nullImplementation, + new NullMetric()); + } + + DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb, + Statistics statistics, Metric metric) { + this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor, + containerCoreMemoryMb)) * maxConcurrentFactor)); + } + + DocprocThreadManager(long maxConcurrentByteSize) { + final int MINCONCURRENTBYTES=256*1024*1024; //256M + if (maxConcurrentByteSize < MINCONCURRENTBYTES) { + maxConcurrentByteSize = MINCONCURRENTBYTES; + } + + this.maxConcurrentByteSize = maxConcurrentByteSize; + log.log(LogLevel.CONFIG, "Docproc service allowed to concurrently process " + + (((double) maxConcurrentByteSize) / 1024.0d / 1024.0d) + " megabytes of input data."); + } + + boolean isAboveLimit() { + return (bytesFinished.get() - bytesStarted.get() > maxConcurrentByteSize); + } + void beforeExecute(DocumentProcessingTask task) { + bytesStarted.getAndAdd(task.getApproxSize()); + } + + void afterExecute(DocumentProcessingTask task) { + bytesFinished.getAndAdd(task.getApproxSize()); + } + void shutdown() { + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java new file mode 100644 index 00000000000..dc690acfe0d --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java @@ -0,0 +1,58 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.log.LogLevel; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocprocThreadPoolExecutor extends ThreadPoolExecutor { + + private static Logger log = Logger.getLogger(DocprocThreadPoolExecutor.class.getName()); + private DocprocThreadManager threadManager; + + public DocprocThreadPoolExecutor(int maxNumThreads, BlockingQueue<Runnable> queue, DocprocThreadManager threadMgr) { + super((maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(), + (maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(), + 5, TimeUnit.MINUTES, + queue, + new DaemonThreadFactory("docproc-")); + this.threadManager = threadMgr; + allowCoreThreadTimeOut(false); + log.log(LogLevel.DEBUG, "Created docproc thread pool with " + super.getCorePoolSize() + " worker threads."); + } + + @Override + protected void beforeExecute(Thread thread, Runnable runnable) { + threadManager.beforeExecute((DocumentProcessingTask) runnable); + } + + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + threadManager.afterExecute((DocumentProcessingTask) runnable); + } + + @Override + public void shutdown() { + super.shutdown(); + threadManager.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + List<Runnable> list = super.shutdownNow(); + threadManager.shutdown(); + return list; + } + + boolean isAboveLimit() { + return threadManager.isAboveLimit(); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java new file mode 100644 index 00000000000..8fa428482e1 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.google.inject.Inject; +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.model.ChainsModel; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.config.docproc.DocprocConfig; +import com.yahoo.config.docproc.SchemamappingConfig; +import com.yahoo.container.core.ChainsConfig; +import com.yahoo.container.core.document.ContainerDocumentConfig; +import com.yahoo.container.jdisc.ContainerMbusConfig; +import com.yahoo.docproc.AbstractConcreteDocumentFactory; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext; +import com.yahoo.docproc.proxy.SchemaMap; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.documentapi.ThroughputLimitQueue; +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.handler.AbstractRequestHandler; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.jdisc.MbusRequest; +import com.yahoo.processing.execution.chain.ChainRegistry; +import com.yahoo.statistics.Statistics; + +import java.util.TimerTask; +import java.util.concurrent.*; +import java.util.logging.Logger; + +import static com.yahoo.component.chain.ChainsConfigurer.prepareChainRegistry; +import static com.yahoo.component.chain.model.ChainsModelBuilder.buildFromConfig; + +/** + * TODO: Javadoc + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingHandler extends AbstractRequestHandler { + + private static Logger log = Logger.getLogger(DocumentProcessingHandler.class.getName()); + private final ComponentRegistry<DocprocService> docprocServiceRegistry; + private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry; + private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>(); + private DocprocThreadPoolExecutor threadPool; + private final ScheduledThreadPoolExecutor laterExecutor = + new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-")); + private ContainerDocumentConfig containerDocConfig; + private final DocumentTypeManager documentTypeManager; + + public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry, + ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, + ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, + DocprocThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager, + ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics, + Metric metric, + ContainerDocumentConfig containerDocConfig) { + this.docprocServiceRegistry = docprocServiceRegistry; + this.docFactoryRegistry = docFactoryRegistry; + this.threadPool = threadPool; + this.containerDocConfig = containerDocConfig; + this.documentTypeManager = documentTypeManager; + DocprocService.schemaMap = schemaMap; + threadPool.prestartCoreThread(); + laterExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + laterExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + + if (chainsModel != null) { + prepareChainRegistry(chainRegistry, chainsModel, documentProcessorComponentRegistry); + + for (Chain<DocumentProcessor> chain : chainRegistry.allComponents()) { + log.config("Setting up call stack for chain " + chain.getId()); + DocprocService service = + new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager); + service.setInService(true); + docprocServiceRegistry.register(service.getId(), service); + } + } + } + + public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry, + ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, + ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, + DocumentProcessingHandlerParameters params) { + this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry, + new DocprocThreadPoolExecutor(params.getMaxNumThreads(), + (params.getMaxQueueTimeMs() > 0) + ? new ThroughputLimitQueue<>(params.getMaxQueueTimeMs()) + : (params.getMaxQueueTimeMs() < 0) + ? new LinkedBlockingQueue<>() + : new PriorityBlockingQueue<>(), //Probably no need to bound this queue, see bug #4254537 + new DocprocThreadManager(params.getMaxConcurrentFactor(), + params.getDocumentExpansionFactor(), + params.getContainerCoreMemoryMb(), + params.getStatisticsManager(), + params.getMetric())), + params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(), + params.getStatisticsManager(), + params.getMetric(), + params.getContainerDocConfig()); + } + + @Inject + public DocumentProcessingHandler(ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry, + ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, + ChainsConfig chainsConfig, + SchemamappingConfig mappingConfig, + DocumentmanagerConfig docManConfig, + DocprocConfig docprocConfig, + ContainerMbusConfig containerMbusConfig, + ContainerDocumentConfig containerDocConfig, + Statistics manager, + Metric metric) { + this(new ComponentRegistry<>(), + documentProcessorComponentRegistry, docFactoryRegistry, new DocumentProcessingHandlerParameters().setMaxNumThreads + (docprocConfig.numthreads()) + .setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor()) + .setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor()) + .setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory()) + .setMaxQueueTimeMs(docprocConfig.maxqueuetimems()) + .setDocumentTypeManager(new DocumentTypeManager(docManConfig)) + .setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig)) + .setStatisticsManager(manager) + .setMetric(metric) + .setContainerDocumentConfig(containerDocConfig)); + } + + @Override + protected void destroy() { + threadPool.shutdown(); //calling shutdownNow() seems like a bit of an overkill + } + + public ComponentRegistry<DocprocService> getDocprocServiceRegistry() { + return docprocServiceRegistry; + } + + public ChainRegistry<DocumentProcessor> getChains() { + return chainRegistry; + } + + private static SchemaMap configureMapping(SchemamappingConfig mappingConfig) { + SchemaMap map = new SchemaMap(); + map.configure(mappingConfig); + return map; + } + + + private static CallStack convertToCallStack(Chain<DocumentProcessor> chain, Statistics statistics, Metric metric) { + CallStack stack = new CallStack(chain.getId().stringValue(), statistics, metric); + for (DocumentProcessor processor : chain.components()) { + processor.getFieldMap().putAll(DocprocService.schemaMap.chainMap(chain.getId().stringValue(), processor.getId().stringValue())); + stack.addLast(processor); + } + return stack; + } + + @Override + public ContentChannel handleRequest(Request request, ResponseHandler handler) { + RequestContext requestContext; + if (request instanceof MbusRequest) { + requestContext = new MbusRequestContext((MbusRequest) request, handler, docprocServiceRegistry, docFactoryRegistry, containerDocConfig); + } else { + //Other types can be added here in the future + throw new IllegalArgumentException("Request type not supported: " + request); + } + + if (!requestContext.isProcessable()) { + requestContext.skip(); + return null; + } + + DocprocService service = docprocServiceRegistry.getComponent(requestContext.getServiceName()); + //No need to enqueue a task if the docproc chain is empty, just forward requestContext + if (service == null) { + log.log(LogLevel.ERROR, "DocprocService for session '" + requestContext.getServiceName() + + "' not found, returning request '" + requestContext + "'."); + requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, + "DocprocService " + requestContext.getServiceName() + " not found."); + return null; + } else if (service.getExecutor().getCallStack().size() == 0) { + //call stack was empty, just forward message + requestContext.skip(); + return null; + } + + DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service); + submit(task); + return null; + } + + @SuppressWarnings("unchecked") + void submit(DocumentProcessingTask task) { + if (threadPool.isAboveLimit()) { + task.queueFull(); + } else { + try { + threadPool.execute(task); + } catch (RejectedExecutionException ree) { + task.queueFull(); + } + } + } + + void submit(DocumentProcessingTask task, long delay) { + LaterTimerTask timerTask = new LaterTimerTask(task, delay); + laterExecutor.schedule(timerTask, delay, TimeUnit.MILLISECONDS); + } + + private class LaterTimerTask extends TimerTask { + private DocumentProcessingTask processingTask; + private long delay; + + private LaterTimerTask(DocumentProcessingTask processingTask, long delay) { + this.delay = delay; + log.log(LogLevel.DEBUG, "Enqueueing in " + delay + " ms due to Progress.LATER: " + processingTask); + this.processingTask = processingTask; + } + + @Override + public void run() { + log.log(LogLevel.DEBUG, "Submitting after having waited " + delay + " ms in LATER queue: " + processingTask); + submit(processingTask); + } + } + + public DocumentTypeManager getDocumentTypeManager() { + return documentTypeManager; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java new file mode 100644 index 00000000000..1a79e4e13c8 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java @@ -0,0 +1,169 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.component.chain.model.ChainsModel; +import com.yahoo.container.core.document.ContainerDocumentConfig; +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.docproc.proxy.SchemaMap; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.jdisc.Metric; +import com.yahoo.statistics.Statistics; + +/** + * Class to hold parameters given to DocumentProcessingHandler, typically used by unit tests. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + * @see com.yahoo.docproc.jdisc.DocumentProcessingHandler + */ +public class DocumentProcessingHandlerParameters { + private int maxNumThreads = 0; + private double maxConcurrentFactor = 0.2; + private double documentExpansionFactor = 20.0; + private int containerCoreMemoryMb = 50; + private long maxQueueTimeMs = 0; + private DocumentTypeManager documentTypeManager = null; + private ChainsModel chainsModel = null; + private SchemaMap schemaMap = null; + private Statistics statisticsManager = Statistics.nullImplementation; + private Metric metric = new NullMetric(); + private ContainerDocumentConfig containerDocConfig; + + public DocumentProcessingHandlerParameters() { + } + + /** + * Returns the number of megabytes of memory reserved for container core classes and data. + * + * @return the number of megabytes of memory reserved for container core classes and data. + */ + public int getContainerCoreMemoryMb() { + return containerCoreMemoryMb; + } + + public DocumentProcessingHandlerParameters setContainerCoreMemoryMb(int containerCoreMemoryMb) { + this.containerCoreMemoryMb = containerCoreMemoryMb; + return this; + } + + public Metric getMetric() { + return metric; + } + + public DocumentProcessingHandlerParameters setMetric(Metric metric) { + this.metric = metric; + return this; + } + + /** + * Returns the document expansion factor, i.e. by what factor a serialized and possibly compressed + * input document is expected to expand during deserialization, including any temporary memory needed + * when processing it. + * + * @return the document expansion factor. + */ + public double getDocumentExpansionFactor() { + return documentExpansionFactor; + } + + public DocumentProcessingHandlerParameters setDocumentExpansionFactor(double documentExpansionFactor) { + this.documentExpansionFactor = documentExpansionFactor; + return this; + } + + /** + * Returns the max concurrent factor. + * + * @return the max concurrent factor. + */ + public double getMaxConcurrentFactor() { + return maxConcurrentFactor; + } + + public DocumentProcessingHandlerParameters setMaxConcurrentFactor(double maxConcurrentFactor) { + this.maxConcurrentFactor = maxConcurrentFactor; + return this; + } + + /** + * Returns the maximum time (in milliseconds) that a document may stay in the input queue. The default value + * of 0 disables this functionality. + * + * @return the maximum time (in milliseconds) that a document may stay in the input queue. + */ + public long getMaxQueueTimeMs() { + return maxQueueTimeMs; + } + + public DocumentProcessingHandlerParameters setMaxQueueTimeMs(long maxQueueTimeMs) { + this.maxQueueTimeMs = maxQueueTimeMs; + return this; + } + + /** + * Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously. + * + * @return the maximum number of thread that the thread pool will ever attempt to run simultaneously. + */ + public int getMaxNumThreads() { + return maxNumThreads; + } + + public DocumentProcessingHandlerParameters setMaxNumThreads(int maxNumThreads) { + this.maxNumThreads = maxNumThreads; + return this; + } + + public DocumentTypeManager getDocumentTypeManager() { + return documentTypeManager; + } + + public DocumentProcessingHandlerParameters setDocumentTypeManager(DocumentTypeManager documentTypeManager) { + this.documentTypeManager = documentTypeManager; + return this; + } + + /** + * Returns the chains model, used to build call stacks. + * @return the chains model, used to build call stacks. + */ + public ChainsModel getChainsModel() { + return chainsModel; + } + + public DocumentProcessingHandlerParameters setChainsModel(ChainsModel chainsModel) { + this.chainsModel = chainsModel; + return this; + } + + /** + * Returns the schema map to be used by the docproc handler. + * + * @return the schema map to be used by the docproc handler. + */ + public SchemaMap getSchemaMap() { + return schemaMap; + } + + public DocumentProcessingHandlerParameters setSchemaMap(SchemaMap schemaMap) { + this.schemaMap = schemaMap; + return this; + } + + public Statistics getStatisticsManager() { + return statisticsManager; + } + + public DocumentProcessingHandlerParameters setStatisticsManager(Statistics statisticsManager) { + this.statisticsManager = statisticsManager; + return this; + } + + public DocumentProcessingHandlerParameters setContainerDocumentConfig(ContainerDocumentConfig containerDocConfig) { + this.containerDocConfig = containerDocConfig; + return this; + } + + public ContainerDocumentConfig getContainerDocConfig() { + return containerDocConfig; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java new file mode 100644 index 00000000000..7e92e8eb5a1 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Tuple2; +import com.yahoo.docproc.Call; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocprocExecutor; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.HandledProcessingException; +import com.yahoo.docproc.Processing; +import com.yahoo.log.LogLevel; +import com.yahoo.yolean.Exceptions; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingTask implements Comparable<DocumentProcessingTask>, Runnable { + private static Logger log = Logger.getLogger(DocumentProcessingTask.class.getName()); + private final List<Processing> processings = new ArrayList<>(); + private final List<Processing> processingsDone = new ArrayList<>(); + + private final DocumentProcessingHandler docprocHandler; + private RequestContext requestContext; + private int waitCounter; + + private final static AtomicLong seq = new AtomicLong(); + private final long seqNum; + private final DocprocService service; + + public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler, + DocprocService service) { + seqNum = seq.getAndIncrement(); + this.requestContext = requestContext; + this.docprocHandler = docprocHandler; + this.waitCounter = 10; + this.service = service; + } + + @Override + public void run() { + try { + try { + processings.addAll(requestContext.getProcessings()); + } catch (Exception e) { + //deserialization failed: + log.log(LogLevel.WARNING, "Deserialization of message failed.", e); + requestContext.processingFailed(e); + return; + } + + DocprocExecutor executor = service.getExecutor(); + DocumentProcessor.Progress progress = process(executor); + + if (DocumentProcessor.Progress.LATER.equals(progress) && !processings.isEmpty()) { + DocumentProcessor.LaterProgress laterProgress = (DocumentProcessor.LaterProgress) progress; + docprocHandler.submit(this, laterProgress.getDelay()); + } + } catch (Error error) { + try { + log.log(LogLevel.FATAL, Exceptions.toMessageString(error), error); + } catch (Throwable t) { + // do nothing + } finally { + Runtime.getRuntime().halt(1); + } + } + } + + /** + * Used by DocprocThreadManager. If a ProcessingTask has been taken by a thread, it can wait() no longer than + * waitCounter (currently 10) times before being executed. This is to prevent large tasks from being delayed + * forever. + * + * @return true if this task can wait, false if it must run NOW. + */ + boolean doWait() { + --waitCounter; + return (waitCounter > 0); + } + + /** + * Processes a single Processing, and fails the message if this processing fails. + * + * @param executor the DocprocService to use for processing + */ + private DocumentProcessor.Progress process(DocprocExecutor executor) { + Iterator<Processing> iterator = processings.iterator(); + List<Tuple2<DocumentProcessor.Progress, Processing>> later = new ArrayList<>(); + while (iterator.hasNext()) { + Processing processing = iterator.next(); + iterator.remove(); + if (requestContext.hasExpired()) { + DocumentProcessor.Progress progress = DocumentProcessor.Progress.FAILED; + final String location; + if (processing != null) { + final CallStack callStack = processing.callStack(); + if (callStack != null) { + final Call lastPopped = callStack.getLastPopped(); + if (lastPopped != null) { + location = lastPopped.toString(); + } else { + location = "empty call stack or no processors popped"; + } + } else { + location = "no call stack"; + } + } else { + location = "no processing instance"; + } + String errorMsg = processing + " failed, " + location; + log.log(Level.FINE, "Time is up for '" + errorMsg + "'."); + requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, "Time is up."); + return progress; + } + + DocumentProcessor.Progress progress = DocumentProcessor.Progress.FAILED; + try { + progress = executor.process(processing); + } catch (Exception e) { + logProcessingFailure(processing, e); + requestContext.processingFailed(e); + return progress; + } + + if (DocumentProcessor.Progress.LATER.equals(progress)) { + later.add(new Tuple2<>(progress, processing)); + } else if (DocumentProcessor.Progress.DONE.equals(progress)) { + processingsDone.add(processing); + } else if (DocumentProcessor.Progress.FAILED.equals(progress)) { + logProcessingFailure(processing, null); + requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, + "Document processing failed."); + return progress; + } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) { + logProcessingFailure(processing, null); + requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, + "Document processing failed."); + return progress; + } + } + + // Processings that have FAILED will have made this method terminate by now. + // We now have successful Processings in 'processingsDone' and + // the ones that have returned LATER in 'later'. + + if (!later.isEmpty()) { + // Outdated comment: + // "if this was a multioperationmessage and more than one of the processings returned LATER, + // return the one with the lowest timeout:" + // As multioperation is removed this can probably be simplified? + DocumentProcessor.LaterProgress shortestDelay = (DocumentProcessor.LaterProgress) later.get(0).first; + for (Tuple2<DocumentProcessor.Progress, Processing> tuple : later) { + // re-add the LATER one to processings + processings.add(tuple.second); + // check to see if this one had a lower timeout than the previous one: + if (((DocumentProcessor.LaterProgress) tuple.first).getDelay() < shortestDelay.getDelay()) { + shortestDelay = (DocumentProcessor.LaterProgress) tuple.first; + } + } + return shortestDelay; + } else { + requestContext.processingDone(processingsDone); + return DocumentProcessor.Progress.DONE; + } + } + + + void queueFull() { + requestContext.processingFailed(RequestContext.ErrorCode.ERROR_BUSY, + "Queue temporarily full. Returning message " + requestContext + + ". Will be automatically resent."); + } + + public int compareTo(DocumentProcessingTask other) { + int ourPriority = requestContext.getPriority(); + int otherPriority = other.requestContext.getPriority(); + int res = (ourPriority == otherPriority) ? 0 : ((ourPriority < otherPriority) ? -1 : 1); + if (res == 0) { + res = (seqNum == other.seqNum) ? 0 : ((seqNum < other.seqNum) ? -1 : 1); + } + return res; + } + + @Override + public String toString() { + return "ProcessingTask{" + + "processings=" + processings + + ", processingsDone=" + processingsDone + + ", requestContext=" + requestContext + + ", seqNum=" + seqNum + + '}'; + } + + public int getApproxSize() { + return requestContext.getApproxSize(); + } + + final long getSeqNum() { + return seqNum; + } + + private static void logProcessingFailure(Processing processing, Exception exception) { + //LOGGING ONLY: + String errorMsg = processing + " failed at " + processing.callStack().getLastPopped(); + if (exception != null) { + if (exception instanceof HandledProcessingException) { + errorMsg += ". Error message: " + exception.getMessage(); + log.log(Level.WARNING, errorMsg); + log.log(Level.FINE, "Chained exception:", exception); + } else { + log.log(Level.WARNING, errorMsg, exception); + } + } else { + log.log(Level.WARNING, errorMsg); + } + //LOGGING OF STACK TRACE: + if (exception != null) { + StringWriter backtrace = new StringWriter(); + exception.printStackTrace(new PrintWriter(backtrace)); + log.log(LogLevel.DEBUG, "Failed to process " + processing + ": " + backtrace.toString()); + } + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java new file mode 100644 index 00000000000..eeda93f12a1 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java @@ -0,0 +1,66 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.docproc.Processing; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.Response; + +import java.net.URI; +import java.util.List; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public interface RequestContext { + + public List<Processing> getProcessings(); + + public String getServiceName(); + + public URI getUri(); + + public boolean isProcessable(); + + public int getApproxSize(); + + public int getPriority(); + + public void processingDone(List<Processing> processing); + + public void processingFailed(ErrorCode error, String msg); + + public void processingFailed(Exception exception); + + /** + * Will check if the given timeout has expired + * @return true if the timeout has expired. + */ + public default boolean hasExpired() { return false;} + + public void skip(); + + public enum ErrorCode { + //transient: + ERROR_ABORTED(Response.Status.TEMPORARY_REDIRECT, DocumentProtocol.ERROR_ABORTED), + ERROR_BUSY(Response.Status.TEMPORARY_REDIRECT, DocumentProtocol.ERROR_BUSY), + //fatal: + ERROR_PROCESSING_FAILURE(Response.Status.INTERNAL_SERVER_ERROR, DocumentProtocol.ERROR_PROCESSING_FAILURE); + + + private int discStatus; + private int documentProtocolStatus; + + private ErrorCode(int discStatus, int documentProtocolStatus) { + this.discStatus = discStatus; + this.documentProtocolStatus = documentProtocolStatus; + } + + public int getDiscStatus() { + return discStatus; + } + + public int getDocumentProtocolStatus() { + return documentProtocolStatus; + } + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java new file mode 100644 index 00000000000..98a091e5dfd --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java @@ -0,0 +1,235 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc.messagebus; + +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.concurrent.CopyOnWriteHashMap; +import com.yahoo.container.core.document.ContainerDocumentConfig; +import com.yahoo.docproc.AbstractConcreteDocumentFactory; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.HandledProcessingException; +import com.yahoo.docproc.Processing; +import com.yahoo.docproc.TransientFailureException; +import com.yahoo.docproc.jdisc.RequestContext; +import com.yahoo.document.DocumentOperation; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.Request; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.RequestDispatch; +import com.yahoo.jdisc.handler.ResponseDispatch; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.jdisc.MbusRequest; +import com.yahoo.messagebus.jdisc.MbusResponse; +import com.yahoo.messagebus.routing.Route; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class MbusRequestContext implements RequestContext, ResponseHandler { + + private final static Logger log = Logger.getLogger(MbusRequestContext.class.getName()); + private final static CopyOnWriteHashMap<String, URI> uriCache = new CopyOnWriteHashMap<>(); + private final AtomicBoolean deserialized = new AtomicBoolean(false); + private final AtomicBoolean responded = new AtomicBoolean(false); + private final ProcessingFactory processingFactory; + private final MessageFactory messageFactory; + private final MbusRequest request; + private final DocumentMessage requestMsg; + private final ResponseHandler responseHandler; + private volatile int cachedApproxSize; + // When spawning off new documents inside document processor, we do not want + // throttling since this can lead to live locks. This is because the + // document being processed is a resource and is then grabbing more resources of + // the same type without releasing its own resources. + public final static String internalNoThrottledSource = "internalNoThrottledSource"; + + public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler, + ComponentRegistry<DocprocService> docprocServiceComponentRegistry, + ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, + ContainerDocumentConfig containerDocConfig) { + this.request = request; + this.requestMsg = (DocumentMessage)request.getMessage(); + this.responseHandler = responseHandler; + this.processingFactory = new ProcessingFactory(docprocServiceComponentRegistry, docFactoryRegistry, + containerDocConfig, getServiceName()); + this.messageFactory = newMessageFactory(requestMsg); + } + + @Override + public List<Processing> getProcessings() { + if (deserialized.getAndSet(true)) { + return Collections.emptyList(); + } + return processingFactory.fromMessage(requestMsg); + } + + @Override + public void skip() { + if (deserialized.get()) { + throw new IllegalStateException("Can not skip processing after deserialization."); + } + dispatchRequest(requestMsg, request.getUri().getPath(), responseHandler); + } + + @Override + public void processingDone(List<Processing> processings) { + List<DocumentMessage> messages = new ArrayList<>(); + if (messageFactory != null) { + for (Processing processing : processings) { + for (DocumentOperation documentOperation : processing.getDocumentOperations()) { + messages.add(messageFactory.fromDocumentOperation(processing, documentOperation)); + } + } + } + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Forwarding " + messages.size() + " messages from " + processings.size() + + " processings."); + } + if (messages.isEmpty()) { + dispatchResponse(Response.Status.OK); + return; + } + long inputSequenceId = requestMsg.getSequenceId(); + ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this); + for (Message message : messages) { + // See comment for internalNoThrottledSource. + dispatchRequest(message, (inputSequenceId == message.getSequenceId()) + ? getUri().getPath() + : "/" + internalNoThrottledSource, + responseHandler); + } + } + + @Override + public void processingFailed(Exception exception) { + ErrorCode errorCode; + if (exception instanceof TransientFailureException) { + errorCode = ErrorCode.ERROR_ABORTED; + } else { + errorCode = ErrorCode.ERROR_PROCESSING_FAILURE; + } + StringBuilder errorMsg = new StringBuilder("Processing failed."); + if (exception instanceof HandledProcessingException) { + errorMsg.append(" Error message: ").append(exception.getMessage()); + } else if (exception != null) { + errorMsg.append(" Error message: ").append(exception.toString()); + } + errorMsg.append(" -- See Vespa log for details."); + processingFailed(errorCode, errorMsg.toString()); + } + + @Override + public void processingFailed(ErrorCode errorCode, String errorMsg) { + MbusResponse response = new MbusResponse(errorCode.getDiscStatus(), requestMsg.createReply()); + response.getReply().addError(new com.yahoo.messagebus.Error(errorCode.getDocumentProtocolStatus(), errorMsg)); + ResponseDispatch.newInstance(response).dispatch(this); + } + + @Override + public int getApproxSize() { + if (cachedApproxSize > 0) { + return cachedApproxSize; + } + cachedApproxSize = requestMsg.getApproxSize(); + return cachedApproxSize; + } + + @Override + public int getPriority() { + return requestMsg.getPriority().getValue(); + } + + @Override + public URI getUri() { + return request.getUri(); + } + + @Override + public String getServiceName() { + String path = getUri().getPath(); + return path.substring(7, path.length()); + } + + @Override + public boolean isProcessable() { + Message msg = requestMsg; + switch (msg.getType()) { + case DocumentProtocol.MESSAGE_PUTDOCUMENT: + case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: + case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: + case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE: + return true; + } + return false; + } + + @Override + public boolean hasExpired() { + return requestMsg.isExpired(); + } + + @Override + public ContentChannel handleResponse(Response response) { + if (responded.getAndSet(true)) { + return null; + } + Reply reply = ((MbusResponse)response).getReply(); + reply.swapState(requestMsg); + return responseHandler.handleResponse(response); + } + + private void dispatchResponse(int status) { + ResponseDispatch.newInstance(new MbusResponse(status, requestMsg.createReply())).dispatch(this); + } + + private void dispatchRequest(final Message msg, final String uriPath, final ResponseHandler handler) { + try { + new RequestDispatch() { + + @Override + protected Request newRequest() { + return new MbusRequest(request, resolveUri(uriPath), msg); + } + + @Override + public ContentChannel handleResponse(Response response) { + return handler.handleResponse(response); + } + }.dispatch(); + } catch (Exception e) { + dispatchResponse(Response.Status.INTERNAL_SERVER_ERROR); + e.printStackTrace(); + } + } + + private static MessageFactory newMessageFactory(final DocumentMessage msg) { + if (msg == null) { + return null; + } + final Route route = msg.getRoute(); + if (route == null || !route.hasHops()) { + return null; + } + return new MessageFactory(msg); + } + + private static URI resolveUri(String path) { + URI uri = uriCache.get(path); + if (uri == null) { + uri = URI.create("mbus://remotehost" + path); + uriCache.put(path, uri); + } + return uri; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java new file mode 100644 index 00000000000..d3962a20ea8 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java @@ -0,0 +1,67 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc.messagebus; + +import com.yahoo.docproc.Processing; +import com.yahoo.document.*; +import com.yahoo.documentapi.messagebus.loadtypes.LoadType; +import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.log.LogLevel; +import com.yahoo.messagebus.Message; +import com.yahoo.vdslib.DocumentList; +import com.yahoo.vdslib.Entry; + +import java.util.logging.Logger; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class MessageFactory { + + private final static Logger log = Logger.getLogger(MessageFactory.class.getName()); + private final Message requestMsg; + private final LoadType loadType; + private final DocumentProtocol.Priority priority; + + public MessageFactory(DocumentMessage requestMsg) { + this.requestMsg = requestMsg; + loadType = requestMsg.getLoadType(); + priority = requestMsg.getPriority(); + } + + public DocumentMessage fromDocumentOperation(Processing processing, DocumentOperation documentOperation) { + DocumentMessage msg = newMessage(documentOperation); + msg.setLoadType(loadType); + msg.setPriority(priority); + msg.setRoute(requestMsg.getRoute()); + msg.setTimeReceivedNow(); + msg.setTimeRemaining(requestMsg.getTimeRemainingNow()); + msg.getTrace().setLevel(requestMsg.getTrace().getLevel()); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Created '" + msg.getClass().getName() + + "', route = '" + msg.getRoute() + + "', priority = '" + msg.getPriority().name() + + "', load type = '" + msg.getLoadType() + + "', trace level = '" + msg.getTrace().getLevel() + + "', time remaining = '" + msg.getTimeRemaining() + "'."); + } + return msg; + } + + private static DocumentMessage newMessage(DocumentOperation documentOperation) { + final TestAndSetMessage message; + + if (documentOperation instanceof DocumentPut) { + message = new PutDocumentMessage(((DocumentPut)documentOperation)); + } else if (documentOperation instanceof DocumentUpdate) { + message = new UpdateDocumentMessage((DocumentUpdate)documentOperation); + } else if (documentOperation instanceof DocumentRemove) { + message = new RemoveDocumentMessage(documentOperation.getId()); + } else { + throw new UnsupportedOperationException(documentOperation.getClass().getName()); + } + + message.setCondition(documentOperation.getCondition()); + return message; + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java new file mode 100644 index 00000000000..19c9d2d86d1 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java @@ -0,0 +1,118 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc.messagebus; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; +import com.yahoo.component.ComponentId; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.container.core.document.ContainerDocumentConfig; +import com.yahoo.docproc.AbstractConcreteDocumentFactory; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.Processing; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.messagebus.Message; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +class ProcessingFactory { + + private final static Logger log = Logger.getLogger(ProcessingFactory.class.getName()); + private final ComponentRegistry<DocprocService> docprocServiceComponentRegistry; + private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry; + private final ContainerDocumentConfig containerDocConfig; + private final String serviceName; + + public ProcessingFactory(ComponentRegistry<DocprocService> docprocServiceComponentRegistry, + ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry, + ContainerDocumentConfig containerDocConfig, + String serviceName) { + this.docprocServiceComponentRegistry = docprocServiceComponentRegistry; + this.docFactoryRegistry = docFactoryRegistry; + this.containerDocConfig = containerDocConfig; + this.serviceName = serviceName; + } + + public List<Processing> fromMessage(Message message) { + List<Processing> processings = new ArrayList<>(); + switch (message.getType()) { + case DocumentProtocol.MESSAGE_PUTDOCUMENT: { + PutDocumentMessage putMessage = (PutDocumentMessage) message; + DocumentPut putOperation = new DocumentPut(createPutDocument(putMessage)); + putOperation.setCondition(putMessage.getCondition()); + processings.add(createProcessing(putOperation, message)); + break; + } + case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: { + UpdateDocumentMessage updateMessage = (UpdateDocumentMessage) message; + DocumentUpdate updateOperation = updateMessage.getDocumentUpdate(); + updateOperation.setCondition(updateMessage.getCondition()); + processings.add(createProcessing(updateOperation, message)); + break; + } + case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: { + RemoveDocumentMessage removeMessage = (RemoveDocumentMessage) message; + DocumentRemove removeOperation = new DocumentRemove(removeMessage.getDocumentId()); + removeOperation.setCondition(removeMessage.getCondition()); + processings.add(createProcessing(removeOperation, message)); + break; + } + case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE: { + for (DocumentUpdate update : ((BatchDocumentUpdateMessage) message).getUpdates()) { + processings.add(createProcessing(update, message)); + } + break; + } + } + return processings; + } + + private Document createPutDocument(PutDocumentMessage msg) { + Document document = msg.getDocumentPut().getDocument(); + String typeName = document.getDataType().getName(); + ContainerDocumentConfig.Doctype typeConfig = getDocumentConfig(typeName); + if (typeConfig == null) { + return document; + } + return createConcreteDocument(document, typeConfig); + } + + private Document createConcreteDocument(Document document, ContainerDocumentConfig.Doctype typeConfig) { + String componentId = typeConfig.factorycomponent(); // Class name of the factory + AbstractConcreteDocumentFactory cdf = docFactoryRegistry.getComponent(new ComponentId(componentId)); + if (cdf == null) { + log.fine("Unable to get document factory component '" + componentId + "' from document factory registry."); + return document; + } + return cdf.getDocumentCopy(document.getDataType().getName(), document, document.getId()); + } + + private ContainerDocumentConfig.Doctype getDocumentConfig(String name) { + for (ContainerDocumentConfig.Doctype type : containerDocConfig.doctype()) { + if (name.equals(type.type())) { + return type; + } + } + return null; + } + + private Processing createProcessing(DocumentOperation documentOperation, Message message) { + Processing processing = new Processing(); + processing.addDocumentOperation(documentOperation); + processing.setServiceName(serviceName); + processing.setDocprocServiceRegistry(docprocServiceComponentRegistry); + processing.setVariable("route", message.getRoute()); + processing.setVariable("timeout", message.getTimeRemaining()); + return processing; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java new file mode 100644 index 00000000000..f29618a0e44 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java @@ -0,0 +1,56 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc.messagebus; + +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.handler.ContentChannel; +import com.yahoo.jdisc.handler.ResponseDispatch; +import com.yahoo.jdisc.handler.ResponseHandler; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.TraceNode; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.jdisc.MbusResponse; +import com.yahoo.messagebus.jdisc.StatusCodes; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +class ResponseMerger implements ResponseHandler { + + private final Message requestMsg; + private final TraceNode requestTrace = new TraceNode().setStrict(false); + private final ResponseHandler responseHandler; + private final List<Reply> replies; + private int numPending; + + public ResponseMerger(Message requestMsg, int numPending, ResponseHandler responseHandler) { + this.requestMsg = requestMsg; + this.responseHandler = responseHandler; + this.replies = new ArrayList<>(numPending); + this.numPending = numPending; + } + + @Override + public ContentChannel handleResponse(Response response) { + synchronized (this) { + if (response instanceof MbusResponse) { + Reply reply = ((MbusResponse)response).getReply(); + requestTrace.addChild(reply.getTrace().getRoot()); + replies.add(reply); + } + if (--numPending != 0) { + return null; + } + } + requestMsg.getTrace().getRoot().addChild(requestTrace); + Reply reply = DocumentProtocol.merge(replies); + Response mbusResponse = new MbusResponse(StatusCodes.fromMbusReply(reply), reply); + ResponseDispatch.newInstance(mbusResponse).dispatch(responseHandler); + return null; + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java new file mode 100644 index 00000000000..2d704fc70da --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.docproc.jdisc.messagebus; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java new file mode 100644 index 00000000000..e7c043d352d --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc.metric; + +import com.yahoo.jdisc.Metric; + +import java.util.Map; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class NullMetric implements Metric { + @Override + public void set(String key, Number val, Context ctx) { + } + + @Override + public void add(String key, Number val, Context ctx) { + } + + @Override + public Context createContext(Map<String, ?> properties) { + return NullContext.INSTANCE; + } + + private static class NullContext implements Context { + private static final NullContext INSTANCE = new NullContext(); + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java new file mode 100644 index 00000000000..610aa2462a4 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.docproc.jdisc; + +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/java/com/yahoo/docproc/package-info.java b/docproc/src/main/java/com/yahoo/docproc/package-info.java new file mode 100644 index 00000000000..b95e4bb128b --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.docproc; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java new file mode 100644 index 00000000000..b3b29a41cc7 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java @@ -0,0 +1,361 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.proxy; + +import com.yahoo.docproc.Accesses; +import com.yahoo.docproc.DocumentOperationWrapper; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.Field; +import com.yahoo.document.FieldPath; +import com.yahoo.document.datatypes.FieldPathIteratorHandler; +import com.yahoo.document.datatypes.FieldPathIteratorHandler.ModificationStatus; +import com.yahoo.document.datatypes.FieldValue; +import com.yahoo.document.datatypes.Struct; +import com.yahoo.document.serialization.DocumentReader; +import com.yahoo.document.serialization.DocumentWriter; +import com.yahoo.document.serialization.FieldReader; +import com.yahoo.document.serialization.FieldWriter; +import com.yahoo.document.serialization.SerializationException; +import com.yahoo.document.serialization.XmlStream; +import com.yahoo.vespa.objects.Serializer; + +import java.io.OutputStream; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * This is a facade to a Document, with multiple purposes: <ul> <li>Getters and setters for field data takes possibly + * into account a schema map of field names. <li>We support mapping into struct fields of arbitrary depth using + * from→mystruct.mystruct.myfield </ul> We also enforce the @Accesses annotation(s) of the doc proc which uses this. + * + * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a> + */ +public class ProxyDocument extends Document implements DocumentOperationWrapper { + + private static final long serialVersionUID = 1L; + private final Map<String, String> fieldMap; + private final Set<String> fieldsAllowed = new HashSet<>(); + private final String docProcName; + private Document doc; + + public ProxyDocument(DocumentProcessor docProc, Document doc, Map<String, String> fieldMap) { + super(doc); + if (docProc.getClass().getAnnotation(Accesses.class)!=null) { + for (com.yahoo.docproc.Accesses.Field field : docProc.getClass().getAnnotation(Accesses.class).value()) { + String name = field.name(); + if (fieldMap!=null && fieldMap.get(name) !=null) name = fieldMap.get(name); + fieldsAllowed.add(name); + } + } + this.fieldMap = fieldMap; + this.docProcName = docProc.toString(); + this.doc=doc; + } + + private void checkAccess(Field field) { + if (!fieldsAllowed.isEmpty() && !fieldsAllowed.contains(field.getName())) { + throw new IllegalArgumentException("Processor '" + docProcName + "' is not allowed to access field '" + + field.getName() + "'."); + } + } + + /** + * note that the returned Field may not be in this Document + * directly, but may refer to a field in a struct contained in it, + * in which case the returned Field is only useful for obtaining + * the field type; it can't be used for get() and set(). + **/ + @Override + public Field getField(String fieldName) { + if (fieldMap != null && fieldMap.containsKey(fieldName)) { + fieldName = fieldMap.get(fieldName); + } + FieldPath path = getFieldPath(fieldName); + Field ret = path.get(path.size() - 1).getFieldRef(); + checkAccess(ret); + return ret; + } + + @Override + public FieldValue getFieldValue(String fieldName) { + return getRecursiveValue(getFieldPath(fieldName)); + } + + @Override + public FieldValue getFieldValue(Field field) { + //checkAccess(field); + return doc.getFieldValue(field); + } + + @Override + public FieldValue setFieldValue(String fieldName, FieldValue fieldValue) { + SetHandler handler = new SetHandler(fieldValue); + FieldPath path = getFieldPath(fieldName); + iterateNested(path, 0, handler); + if (!handler.doModifyCalled) { + //the value in question was not found + throw new IllegalArgumentException("Field '" + fieldName + "' mapped by '" + path + "' was not found."); + } + return handler.prevVal; + } + + @Override + public FieldValue setFieldValue(Field field, FieldValue fieldValue) { + checkAccess(field); + return doc.setFieldValue(field, fieldValue); + } + + @Override + public FieldValue removeFieldValue(String fieldName) { + RemoveHandler handler = new RemoveHandler(); + FieldPath path = getFieldPath(fieldName); + iterateNested(path, 0, handler); + if (!handler.doModifyCalled) { + //the value in question was not found + throw new IllegalArgumentException("Field '" + fieldName + "' mapped by '" + path + "' was not found."); + } + return handler.prevVal; + } + + @Override + public FieldValue removeFieldValue(Field field) { + checkAccess(field); + return doc.removeFieldValue(field); + } + + private FieldPath getFieldPath(String fieldName) { + if (fieldMap != null && fieldMap.containsKey(fieldName)) { + fieldName = fieldMap.get(fieldName); + } + checkAccess(new Field(fieldName)); + FieldPath path = FieldPath.newInstance(getDataType(), fieldName); + if (path == null || path.size() == 0) { + throw new IllegalArgumentException("Malformed schema mapping '" + fieldName + "'."); + } + return path; + } + + public DocumentOperation getWrappedDocumentOperation() { + return new DocumentPut(this); + } + + private static class SetHandler extends FieldPathIteratorHandler { + + private final FieldValue nextVal; + private FieldValue prevVal; + private boolean doModifyCalled = false; + + public SetHandler(FieldValue fieldValue) { + nextVal = fieldValue; + } + + @Override + public boolean onComplex(FieldValue fieldVal) { + return false; + } + + @Override + public boolean createMissingPath() { + return true; + } + + + @Override + public ModificationStatus doModify(FieldValue fieldVal) { + doModifyCalled = true; + prevVal = fieldVal.clone(); + fieldVal.assign(nextVal); + return ModificationStatus.MODIFIED; + } + } + + @Override + public boolean equals(Object o) { + return doc.equals(o); + } + + @Override + public String toString() { + return doc.toString(); + } + + @Override + public int hashCode() { + return doc.hashCode(); + } + + @Override + public Document clone() { + return doc.clone(); + } + + @Override + public void clear() { + doc.clear(); + } + + @Override + public Iterator<Entry<Field, FieldValue>> iterator() { + return doc.iterator(); + } + + @Override + public DocumentId getId() { + return doc.getId(); + } + + @Override + public void setLastModified(Long lastModified) { + doc.setLastModified(lastModified); + } + + @Override + public Long getLastModified() { + return doc.getLastModified(); + } + + @Override + public void setId(DocumentId id) { + doc.setId(id); + } + + @Override + public Struct getHeader() { + return doc.getHeader(); + } + + @Override + public Struct getBody() { + return doc.getBody(); + } + + @Override + public void assign(Object o) { + doc.assign(o); + } + + @Override + public void setDataType(DataType type) { + doc.setDataType(type); + } + + @Override + public int getSerializedSize() throws SerializationException { + return doc.getSerializedSize(); + } + + @Override + public void serialize(OutputStream out) throws SerializationException { + doc.serialize(out); + } + + @Override + protected void doSetFieldValue(Field field, FieldValue value) { + super.doSetFieldValue(field, value); + } + + @Override + public String toXML(String indent) { + return doc.toXML(indent); + } + + @Override + public String toXml() { + return doc.toXml(); + } + + @Override + public void printXml(XmlStream xml) { + doc.printXml(xml); + } + + @Override + public void onSerialize(Serializer target) throws SerializationException { + doc.onSerialize(target); + } + + @Override + public void serializeHeader(Serializer target) throws SerializationException { + doc.serializeHeader(target); + } + + @Override + public void serializeBody(Serializer target) throws SerializationException { + doc.serializeBody(target); + } + + @Override + public DocumentType getDataType() { + return doc.getDataType(); + } + + @Override + public int getFieldCount() { + return doc.getFieldCount(); + } + + @Override + public void serialize(DocumentWriter writer) { + doc.serialize(writer); + } + + @Override + public void deserialize(DocumentReader reader) { + doc.deserialize(reader); + } + + @Override + public void serialize(Field field, FieldWriter writer) { + doc.serialize(field, writer); + } + + @Override + public void deserialize(Field field, FieldReader reader) { + doc.deserialize(field, reader); + } + + @Override + public int compareTo(FieldValue fieldValue) { + return super.compareTo(fieldValue); + } + + @Override + public ModificationStatus iterateNested(FieldPath fieldPath, int pos, + FieldPathIteratorHandler handler) { + return doc.iterateNested(fieldPath, pos, handler); + } + + private static class RemoveHandler extends FieldPathIteratorHandler { + private boolean doModifyCalled = false; + private FieldValue prevVal; + + @Override + public boolean onComplex(FieldValue fieldVal) { + return false; + } + + @Override + public ModificationStatus doModify(FieldValue fieldVal) { + doModifyCalled = true; + prevVal = fieldVal.clone(); + return ModificationStatus.REMOVED; + } + } + + /** + * The {@link Document} which this proxies + * @return The proxied Document + */ + public Document getDocument() { + return doc; + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java new file mode 100644 index 00000000000..5892fc349e2 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java @@ -0,0 +1,119 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.proxy; +import com.yahoo.docproc.DocumentOperationWrapper; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.Field; +import com.yahoo.document.serialization.DocumentUpdateWriter; +import com.yahoo.document.update.FieldUpdate; + +import java.util.List; +import java.util.Map; + +/** + * Schema mapped facade to a DocumentUpdate + * @author vegardh + * + */ +public class ProxyDocumentUpdate extends DocumentUpdate implements DocumentOperationWrapper { + + private DocumentUpdate docU; + /** + * The field name map for schema mapping. The key is the field name that the docproc uses. The value is the actual name of the field + * in the document. + */ + private Map<String, String> fieldMap; + + public ProxyDocumentUpdate(DocumentUpdate docUpd, Map<String, String> fieldMap) { + super(docUpd.getType(), docUpd.getId().toString()+"-schemamappedupdate"); + this.docU=docUpd; + this.fieldMap=fieldMap; + } + + @Override + public DocumentType getDocumentType() { + return docU.getDocumentType(); + } + + @Override + public FieldUpdate getFieldUpdate(Field field) { + return getFieldUpdate(field.getName()); + } + + @Override + public FieldUpdate getFieldUpdate(int index) { + return docU.getFieldUpdate(index); + } + + @Override + public FieldUpdate getFieldUpdate(String fieldName) { + String mapped = fieldMap.get(fieldName); + if (mapped==null) { + return docU.getFieldUpdate(fieldName); + } + // TODO how about structs here? + return docU.getFieldUpdate(mapped); + } + + @Override + public List<FieldUpdate> getFieldUpdates() { + return docU.getFieldUpdates(); + } + + @Override + public DocumentId getId() { + return docU.getId(); + } + + @Override + public DocumentType getType() { + return docU.getType(); + } + + @Override + public DocumentUpdate addFieldUpdate(FieldUpdate fieldUpdate) { + return docU.addFieldUpdate(fieldUpdate); + } + + @Override + public DocumentUpdate applyTo(Document doc) { + return docU.applyTo(doc); + } + + @Override + public boolean equals(Object o) { + return docU.equals(o); + } + + @Override + public int hashCode() { + return docU.hashCode(); + } + + @Override + public void serialize(DocumentUpdateWriter data) { + docU.serialize(data); + } + + @Override + public int size() { + return docU.size(); + } + + @Override + public String toString() { + return docU.toString(); + } + + @Override + public DocumentOperation getWrappedDocumentOperation() { + DocumentOperation innermostDocOp = docU; + while (innermostDocOp instanceof DocumentOperationWrapper) { + innermostDocOp = ((DocumentOperationWrapper) innermostDocOp).getWrappedDocumentOperation(); + } + return innermostDocOp; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java b/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java new file mode 100644 index 00000000000..b27c8b725f2 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java @@ -0,0 +1,131 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.proxy; + +import com.yahoo.collections.Pair; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.config.docproc.SchemamappingConfig; +import com.yahoo.config.docproc.SchemamappingConfig.Fieldmapping; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.logging.Logger; + +/** + * Can be used to map field names from input doc into names used in a docproc that was + * written with generic field names. + * @author vegardh + * + */ +public class SchemaMap implements ConfigSubscriber.SingleSubscriber<SchemamappingConfig> { + /** + * Map key. Doctype can be null, not the others. + * @author vegardh + * + */ + class SchemaMapKey { + + private final String chain; + private final String docproc; + private final String doctype; + private final String inDocument; + + public SchemaMapKey(String chain, String docproc, String doctype, String from) { + this.chain = chain; + this.docproc = docproc; + this.doctype = doctype; + this.inDocument = from; + if (chain==null) throw new IllegalArgumentException("'chain' cannot be null in schema map."); + if (docproc==null) throw new IllegalArgumentException("'docproc' cannot be null in schema map."); + if (from==null) throw new IllegalArgumentException("'from' cannot be null in schema map."); + } + public String getChain() { + return chain; + } + public String getDocproc() { + return docproc; + } + public String getDoctype() { + return doctype; + } + public String getInDocument() { + return inDocument; + } + + private boolean equalType(SchemaMapKey other) { + if (doctype==null) return other.getDoctype()==null; + return doctype.equals(other.getDoctype()); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof SchemaMapKey)) return false; + SchemaMapKey other = (SchemaMapKey)obj; + return other.getChain().equals(chain) && + other.getDocproc().equals(docproc) && + other.getInDocument().equals(inDocument) && + equalType(other); + } + @Override + public int hashCode() { + return chain.hashCode()+docproc.hashCode()+(doctype!=null?doctype.hashCode():0)+inDocument.hashCode(); + } + + } + + // (key->inProcessor),... + private final ConfigSubscriber subscriber; + private Map<SchemaMapKey, String> fields = new HashMap<>(); + + void addMapping(String chain, String docproc, String doctype, String inDocument, String inProcessor) { + fields.put(new SchemaMapKey(chain, docproc, doctype, inDocument), inProcessor); + } + + /** + * New map from the docproc cluster's config id + * @param configid can be null. Will not get anything from config in that case. + */ + public SchemaMap(String configid) { + subscriber = new ConfigSubscriber(); + if (configid!=null) { + subscriber.subscribe(this, SchemamappingConfig.class, configid); + } + } + + public SchemaMap() { + this(null); + } + + @Override + public void configure(SchemamappingConfig config) { + if (config == null) { + return; + } + fields.clear(); + for (Fieldmapping m: config.fieldmapping()) { + SchemaMapKey key = new SchemaMapKey(m.chain(), m.docproc(), ("".equals(m.doctype())?null:m.doctype()), m.indocument()); + fields.put(key, m.inprocessor()); + } + } + + /** + * The map for a given chain,docproc: + * "Reverses" the direction, this is the mapping a docproc should do when a + * doc comes in. The doctype is null if not given in map. + * + * @return (doctype,inProcessor)→inDocument + */ + public Map<Pair<String,String>, String> chainMap(String chain, String docproc) { + Map<Pair<String, String>, String> ret = new HashMap<>(); + for (Entry<SchemaMapKey, String> e : fields.entrySet()) { + SchemaMapKey key = e.getKey(); + if (key.getChain().equals(chain) && key.getDocproc().equals(docproc)) { + // Reverse direction here + ret.put(new Pair<>(key.getDoctype(),e.getValue()), key.getInDocument()); + } + } + return ret; + } + + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java new file mode 100644 index 00000000000..f934d64736f --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.util; + +import com.yahoo.component.ComponentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.document.datatypes.Array; +import com.yahoo.log.LogLevel; + +import java.util.logging.Logger; + +import static com.yahoo.docproc.util.SplitterDocumentProcessor.validate; +import static com.yahoo.docproc.util.SplitterDocumentProcessor.doProcessOuterDocument; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class JoinerDocumentProcessor extends DocumentProcessor { + + private static Logger log = Logger.getLogger(JoinerDocumentProcessor.class.getName()); + private String documentTypeName; + private String arrayFieldName; + private String contextFieldName; + DocumentTypeManager manager; + + public JoinerDocumentProcessor(SplitterJoinerDocumentProcessorConfig cfg, DocumentmanagerConfig documentmanagerConfig) { + super(); + this.documentTypeName = cfg.documentTypeName(); + this.arrayFieldName = cfg.arrayFieldName(); + this.contextFieldName = cfg.contextFieldName(); + manager = DocumentTypeManagerConfigurer.configureNewManager(documentmanagerConfig); + validate(manager, documentTypeName, arrayFieldName); + } + + @Override + public Progress process(Processing processing) { + if ( ! doProcessOuterDocument(processing.getVariable(contextFieldName), documentTypeName)) { + return Progress.DONE; + } + + DocumentPut outerDoc = (DocumentPut)processing.getVariable(contextFieldName); + + Array<Document> innerDocuments = (Array<Document>) outerDoc.getDocument().getFieldValue(arrayFieldName); + + if (innerDocuments == null) { + innerDocuments = (Array<Document>) outerDoc.getDocument().getDataType().getField(arrayFieldName).getDataType().createFieldValue(); + } + + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op instanceof DocumentPut) { + innerDocuments.add(((DocumentPut)op).getDocument()); + } else { + log.log(LogLevel.DEBUG, "Skipping: " + op); + } + } + processing.getDocumentOperations().clear(); + processing.getDocumentOperations().add(outerDoc); + processing.removeVariable(contextFieldName); + return Progress.DONE; + } +} diff --git a/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java new file mode 100644 index 00000000000..ed45435d6ef --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java @@ -0,0 +1,147 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.util; + +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.ArrayDataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.document.datatypes.Array; +import com.yahoo.log.LogLevel; + +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class SplitterDocumentProcessor extends DocumentProcessor { + + private static Logger log = Logger.getLogger(SplitterDocumentProcessor.class.getName()); + private String documentTypeName; + private String arrayFieldName; + private String contextFieldName; + DocumentTypeManager manager; + + public SplitterDocumentProcessor(SplitterJoinerDocumentProcessorConfig cfg, DocumentmanagerConfig documentmanagerConfig) { + super(); + this.documentTypeName = cfg.documentTypeName(); + this.arrayFieldName = cfg.arrayFieldName(); + this.contextFieldName = cfg.contextFieldName(); + this.manager = DocumentTypeManagerConfigurer.configureNewManager(documentmanagerConfig); + validate(manager, documentTypeName, arrayFieldName); + } + + @Override + public Progress process(Processing processing) { + if (processing.getDocumentOperations().size() != 1) { + //we were given more than one document, return + log.log(LogLevel.DEBUG, "More than one document given, returning. (Was given " + + processing.getDocumentOperations().size() + " documents)."); + return Progress.DONE; + } + + if (!doProcessOuterDocument(processing.getDocumentOperations().get(0), documentTypeName)) { + return Progress.DONE; + } + + Document outerDoc = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();; + + Array<Document> innerDocuments = (Array<Document>) outerDoc.getFieldValue(arrayFieldName); + if (innerDocuments == null) { + //the document does not have the field, return + log.log(LogLevel.DEBUG, "The given Document does not have a field value for field " + + arrayFieldName + ", returning. (Was given " + outerDoc + ")."); + return Progress.DONE; + } + + if (innerDocuments.size() == 0) { + //the array is empty, return + log.log(LogLevel.DEBUG, "The given Document does not have any elements in array field " + + arrayFieldName + ", returning. (Was given " + outerDoc + ")."); + return Progress.DONE; + } + + split(processing, innerDocuments); + return Progress.DONE; + } + + private void split(Processing processing, Array<Document> innerDocuments) { + processing.setVariable(contextFieldName, processing.getDocumentOperations().get(0)); + processing.getDocumentOperations().clear(); + processing.getDocumentOperations().addAll(innerDocuments.stream() + .map(DocumentPut::new) + .collect(Collectors.toList())); + + innerDocuments.clear(); + } + + + static void validate(DocumentTypeManager manager, String documentTypeName, String arrayFieldName) { + DocumentType docType = manager.getDocumentType(documentTypeName); + + if (docType == null) { + //the document type does not exist, return + throw new IllegalStateException("The document type " + documentTypeName + " is not deployed."); + } + + if (docType.getField(arrayFieldName) == null) { + //the document type does not have the field, return + throw new IllegalStateException("The document type " + documentTypeName + + " does not have a field named " + arrayFieldName + "."); + } + + if (!(docType.getField(arrayFieldName).getDataType() instanceof ArrayDataType)) { + //the data type of the field is wrong, return + throw new IllegalStateException("The data type of the field named " + + arrayFieldName + " in document type " + documentTypeName + + " is not an array type"); + } + + ArrayDataType fieldDataType = (ArrayDataType) docType.getField(arrayFieldName).getDataType(); + + if (!(fieldDataType.getNestedType() instanceof DocumentType)) { + //the subtype of tye array data type of the field is wrong, return + throw new IllegalStateException("The data type of the field named " + + arrayFieldName + " in document type " + documentTypeName + + " is not an array of Document."); + } + } + + static boolean doProcessOuterDocument(Object o, String documentTypeName) { + if ( ! (o instanceof DocumentOperation)) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, o + " is not a DocumentOperation."); + } + return false; + } + + DocumentOperation outerDocOp = (DocumentOperation)o; + if ( ! (outerDocOp instanceof DocumentPut)) { + //this is not a put, return + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Given DocumentOperation is not a DocumentPut, returning. (Was given " + + outerDocOp + ")."); + } + return false; + } + + Document outerDoc = ((DocumentPut) outerDocOp).getDocument(); + DocumentType type = outerDoc.getDataType(); + if (!type.getName().equalsIgnoreCase(documentTypeName)) { + //this is not the right document type + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, "Given Document is of wrong type, returning. (Was given " + outerDoc + ")."); + } + return false; + } + return true; + } + +} diff --git a/docproc/src/main/java/com/yahoo/docproc/util/package-info.java b/docproc/src/main/java/com/yahoo/docproc/util/package-info.java new file mode 100644 index 00000000000..3becb4f2d15 --- /dev/null +++ b/docproc/src/main/java/com/yahoo/docproc/util/package-info.java @@ -0,0 +1,7 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi +package com.yahoo.docproc.util; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/docproc/src/main/resources/configdefinitions/docproc.def b/docproc/src/main/resources/configdefinitions/docproc.def new file mode 100644 index 00000000000..a3e89c22226 --- /dev/null +++ b/docproc/src/main/resources/configdefinitions/docproc.def @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +version=12 +namespace=config.docproc + +# Queue size (in milliseconds) for this node +# Positive size gives a ThroughPutLimitQueue. ### Experimental. +# 0 Gives a PriorityQueue +# Negative values gives a ordinary LinkedBlockingQueue. This is fastest and most efficient. +maxqueuetimems int default=-1 + +#The number of threads in the DocprocHandler worker thread pool +numthreads int default=-1 diff --git a/docproc/src/main/resources/configdefinitions/schemamapping.def b/docproc/src/main/resources/configdefinitions/schemamapping.def new file mode 100644 index 00000000000..a8e0243afac --- /dev/null +++ b/docproc/src/main/resources/configdefinitions/schemamapping.def @@ -0,0 +1,20 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +# Use when a docproc works on a generic set of field names and the actual names +# in input doc may be different +version=1 +namespace=config.docproc + +# The chain this mapping applies to +fieldmapping[].chain string + +# The class name of the docproc this mapping applies to +fieldmapping[].docproc string + +#The doc type name this mapping applies to +fieldmapping[].doctype string default="" + +# Field name in input doc +fieldmapping[].indocument string + +# Field name a docproc uses +fieldmapping[].inprocessor string diff --git a/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def b/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def new file mode 100644 index 00000000000..73e84578ed9 --- /dev/null +++ b/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def @@ -0,0 +1,12 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +version=2 +namespace=config.docproc + +# The name of the enclosing (outer) document type +documentTypeName string + +# The name of the field of type array of document +arrayFieldName string + +# The name of the context variable used by Processing.getVariable() +contextFieldName string default="docproc@splitter@joiner@outer@document" diff --git a/docproc/src/test/cfg/docproc-chain-arguments.cfg b/docproc/src/test/cfg/docproc-chain-arguments.cfg new file mode 100644 index 00000000000..bbd66d7591d --- /dev/null +++ b/docproc/src/test/cfg/docproc-chain-arguments.cfg @@ -0,0 +1,20 @@ +docprocchain[1] +docprocchain[0].name "chain-arguments" +docprocchain[0].processor[3] + +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor1" +docprocchain[0].processor[0].argument[2] +docprocchain[0].processor[0].argument[0].name arg1 +docprocchain[0].processor[0].argument[0].value val1 +docprocchain[0].processor[0].argument[1].name arg2 +docprocchain[0].processor[0].argument[1].value val2 + +docprocchain[0].processor[1].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor2" +docprocchain[0].processor[1].argument[1] +docprocchain[0].processor[1].argument[0].name arg1 +docprocchain[0].processor[1].argument[0].value val3 + +docprocchain[0].processor[2].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor3" +docprocchain[0].processor[2].argument[1] +docprocchain[0].processor[2].argument[0].name arg2 +docprocchain[0].processor[2].argument[0].value val4 diff --git a/docproc/src/test/cfg/docproc-chain-empty.cfg b/docproc/src/test/cfg/docproc-chain-empty.cfg new file mode 100644 index 00000000000..091ed109913 --- /dev/null +++ b/docproc/src/test/cfg/docproc-chain-empty.cfg @@ -0,0 +1,6 @@ +docprocchain[2] +docprocchain[0].name "baloo" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor1" +docprocchain[1].name "badoo" +docprocchain[1].processor[0] diff --git a/docproc/src/test/cfg/docproc-chain-parameters.cfg b/docproc/src/test/cfg/docproc-chain-parameters.cfg new file mode 100644 index 00000000000..0780030bd3b --- /dev/null +++ b/docproc/src/test/cfg/docproc-chain-parameters.cfg @@ -0,0 +1,24 @@ +docprocchain[1] +docprocchain[0].name "parameters" +docprocchain[0].processor[3] + +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor1" +docprocchain[0].processor[0].parameter[4] +docprocchain[0].processor[0].parameter[0].name param1 +docprocchain[0].processor[0].parameter[0].value val1 +docprocchain[0].processor[0].parameter[1].name param1 +docprocchain[0].processor[0].parameter[1].value val2 +docprocchain[0].processor[0].parameter[2].name param1 +docprocchain[0].processor[0].parameter[2].value val3 +docprocchain[0].processor[0].parameter[3].name param2 +docprocchain[0].processor[0].parameter[3].value val0 + +docprocchain[0].processor[1].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor2" +docprocchain[0].processor[1].parameter[1] +docprocchain[0].processor[1].parameter[0].name param1 +docprocchain[0].processor[1].parameter[0].value val3 + +docprocchain[0].processor[2].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor3" +docprocchain[0].processor[2].parameter[1] +docprocchain[0].processor[2].parameter[0].name param2 +docprocchain[0].processor[2].parameter[0].value val4 diff --git a/docproc/src/test/cfg/docproc-chain-slow.cfg b/docproc/src/test/cfg/docproc-chain-slow.cfg new file mode 100644 index 00000000000..27cedd9407c --- /dev/null +++ b/docproc/src/test/cfg/docproc-chain-slow.cfg @@ -0,0 +1,4 @@ +docprocchain[1] +docprocchain[0].name "slow" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceConfigurationTestCase$SlowDocumentProcessor" diff --git a/docproc/src/test/cfg/docproc-chain.cfg b/docproc/src/test/cfg/docproc-chain.cfg new file mode 100644 index 00000000000..ed40e889004 --- /dev/null +++ b/docproc/src/test/cfg/docproc-chain.cfg @@ -0,0 +1,5 @@ +docprocchain[1] +docprocchain[0].processor[3] +docprocchain[0].processor[0].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor1" +docprocchain[0].processor[1].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor2" +docprocchain[0].processor[2].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor3" diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg new file mode 100644 index 00000000000..21b6b2e4e57 --- /dev/null +++ b/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg @@ -0,0 +1,7 @@ +docprocchain[1] +docprocchain[0].name "nomapconstructor" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoMapConstructorDocumentProcessor" +docprocchain[0].processor[0].parameter[1] +docprocchain[0].processor[0].parameter[0].name "dis" +docprocchain[0].processor[0].parameter[0].value "dat" diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg new file mode 100644 index 00000000000..c710a4726a7 --- /dev/null +++ b/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg @@ -0,0 +1,4 @@ +docprocchain[1] +docprocchain[0].name "nopublicconstructor" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoPublicConstructorDocumentProcessor" diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg new file mode 100644 index 00000000000..002210daf1f --- /dev/null +++ b/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg @@ -0,0 +1,4 @@ +docprocchain[1] +docprocchain[0].name "nostringconstructor" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoStringConstructorDocumentProcessor" diff --git a/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg b/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg new file mode 100644 index 00000000000..b6a2a71d87f --- /dev/null +++ b/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg @@ -0,0 +1,4 @@ +docprocchain[1] +docprocchain[0].name "ok" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$OkDocumentProcessor" diff --git a/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg new file mode 100644 index 00000000000..a8296c14d20 --- /dev/null +++ b/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg @@ -0,0 +1,4 @@ +docprocchain[1] +docprocchain[0].name "throwingexcaptioninconstructor" +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$ThrowingExceptionInConstructorDocumentProcessor" diff --git a/docproc/src/test/cfg/messagebus/docproc-chain.cfg b/docproc/src/test/cfg/messagebus/docproc-chain.cfg new file mode 100644 index 00000000000..8496ebfb05c --- /dev/null +++ b/docproc/src/test/cfg/messagebus/docproc-chain.cfg @@ -0,0 +1,2 @@ +docprocchain[1] +docprocchain[0].processor[0] diff --git a/docproc/src/test/cfg/messagebus/docproc.cfg b/docproc/src/test/cfg/messagebus/docproc.cfg new file mode 100644 index 00000000000..ef67a6993b0 --- /dev/null +++ b/docproc/src/test/cfg/messagebus/docproc.cfg @@ -0,0 +1,4 @@ +rpcserver.enabled false +rpcserver.port 6985 +slobrok.enabled false +slobrok.servicename "dpcluster.1/docproc/78" diff --git a/docproc/src/test/cfg/messagebus/documentmanager.cfg b/docproc/src/test/cfg/messagebus/documentmanager.cfg new file mode 100644 index 00000000000..36cf88c8384 --- /dev/null +++ b/docproc/src/test/cfg/messagebus/documentmanager.cfg @@ -0,0 +1,36 @@ +datatype[3] +datatype[0].id 306916075 +datatype[0].arraytype[0] +datatype[0].weightedsettype[0] +datatype[0].structtype[1] +datatype[0].structtype[0].name test.header +datatype[0].structtype[0].version 0 +datatype[0].structtype[0].field[3] +datatype[0].structtype[0].field[0].name test +datatype[0].structtype[0].field[0].id[0] +datatype[0].structtype[0].field[0].datatype 2 +datatype[0].structtype[0].field[1].name touched +datatype[0].structtype[0].field[1].id[0] +datatype[0].structtype[0].field[1].datatype 2 +datatype[0].structtype[0].field[2].name docno +datatype[0].structtype[0].field[2].id[0] +datatype[0].structtype[0].field[2].datatype 0 +datatype[0].documenttype[0] +datatype[1].id -1270491200 +datatype[1].arraytype[0] +datatype[1].weightedsettype[0] +datatype[1].structtype[1] +datatype[1].structtype[0].name test.body +datatype[1].structtype[0].version 0 +datatype[1].structtype[0].field[0] +datatype[1].documenttype[0] +datatype[2].id -877171244 +datatype[2].arraytype[0] +datatype[2].weightedsettype[0] +datatype[2].structtype[0] +datatype[2].documenttype[1] +datatype[2].documenttype[0].name test +datatype[2].documenttype[0].version 0 +datatype[2].documenttype[0].inherits[0] +datatype[2].documenttype[0].headerstruct 306916075 +datatype[2].documenttype[0].bodystruct -1270491200 diff --git a/docproc/src/test/cfg/server/docproc-chain.cfg b/docproc/src/test/cfg/server/docproc-chain.cfg new file mode 100644 index 00000000000..0c7b14eb657 --- /dev/null +++ b/docproc/src/test/cfg/server/docproc-chain.cfg @@ -0,0 +1,3 @@ +docprocchain[1] +docprocchain[0].processor[1] +docprocchain[0].processor[0].classname "com.yahoo.docproc.server.test.ServerTestCase$BalooDocumentProcessor" diff --git a/docproc/src/test/cfg/server/docproc.cfg b/docproc/src/test/cfg/server/docproc.cfg new file mode 100644 index 00000000000..602f6fc4f10 --- /dev/null +++ b/docproc/src/test/cfg/server/docproc.cfg @@ -0,0 +1,4 @@ +rpcserver.enabled true +rpcserver.port 6985 +slobrok.enabled false +slobrok.servicename dpcluster.1/docproc/78 diff --git a/docproc/src/test/cfg/server/documentmanager.cfg b/docproc/src/test/cfg/server/documentmanager.cfg new file mode 100644 index 00000000000..b3e360e961f --- /dev/null +++ b/docproc/src/test/cfg/server/documentmanager.cfg @@ -0,0 +1,36 @@ +datatype[3] +datatype[0].id 306916075 +datatype[0].arraytype[0] +datatype[0].weightedsettype[0] +datatype[0].structtype[1] +datatype[0].structtype[0].name test.header +datatype[0].structtype[0].version 0 +datatype[0].structtype[0].field[2] +datatype[0].structtype[0].field[0].name test +datatype[0].structtype[0].field[0].id[0] +datatype[0].structtype[0].field[0].datatype 2 +datatype[0].structtype[0].field[1].name touched +datatype[0].structtype[0].field[1].id[0] +datatype[0].structtype[0].field[1].datatype 2 +datatype[0].documenttype[0] +datatype[1].id -1270491200 +datatype[1].arraytype[0] +datatype[1].weightedsettype[0] +datatype[1].structtype[1] +datatype[1].structtype[0].name test.body +datatype[1].structtype[0].version 0 +datatype[1].structtype[0].field[1] +datatype[1].structtype[0].field[0].name docno +datatype[1].structtype[0].field[0].id[0] +datatype[1].structtype[0].field[0].datatype 0 +datatype[1].documenttype[0] +datatype[2].id -877171244 +datatype[2].arraytype[0] +datatype[2].weightedsettype[0] +datatype[2].structtype[0] +datatype[2].documenttype[1] +datatype[2].documenttype[0].name test +datatype[2].documenttype[0].version 0 +datatype[2].documenttype[0].inherits[0] +datatype[2].documenttype[0].headerstruct 306916075 +datatype[2].documenttype[0].bodystruct -1270491200 diff --git a/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java new file mode 100644 index 00000000000..fedc0eb21f5 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java @@ -0,0 +1,68 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.IntegerFieldValue; +import com.yahoo.document.datatypes.StringFieldValue; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> + */ +public class AccessesAnnotationTestCase { + + @Test + public void requireThatFieldsAreRestricted() { + DocumentType type = new DocumentType("album"); + type.addField("title", DataType.STRING); + type.addField("artist", DataType.STRING); + type.addField("year", DataType.INT); + Document doc = new Document(type, new DocumentId("doc:map:test:1")); + + MyDocProc docProc = new MyDocProc(); + DocumentPut put = new DocumentPut(doc); + Document proxy = new Call(docProc).configDoc(docProc, put).getDocument(); + proxy.setFieldValue("title", new StringFieldValue("foo")); + try { + proxy.setFieldValue("year", new IntegerFieldValue(69)); + fail("Should have failed"); + } catch (Exception e) { + System.out.println(e.getMessage()); + assertTrue(e.getMessage().matches(".*not allowed.*")); + } + + proxy.getFieldValue("title"); + try { + proxy.getFieldValue("year"); + fail("Should have failed"); + } catch (Exception e) { + System.out.println(e.getMessage()); + assertTrue(e.getMessage().matches(".*not allowed.*")); + } + } + + @Accesses({ + @Accesses.Field(name = "title", dataType = "string", description = "What is done on field title", + annotations = @Accesses.Field.Tree(produces = { "sentences" })), + @Accesses.Field(name = "artist", dataType = "string", description = "What is done on field artist", + annotations = { + @Accesses.Field.Tree(name = "root", produces = { "sentences" }), + @Accesses.Field.Tree(name = "root2", consumes = { "places" }) + }), + @Accesses.Field(name = "track", dataType = "string", description = "What is done on field track") + }) + class MyDocProc extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + return null; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java b/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java new file mode 100644 index 00000000000..8ef832faf31 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java @@ -0,0 +1,291 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import java.util.Iterator; + +/** + * Tests call stacks + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a> + */ +public class CallStackTestCase extends junit.framework.TestCase { + + private CallStack callStack, insertStack; + + private Call call1, call2, call3, noCall, newCall, insert1, insert2, insert3; + + private DocumentProcessor processor2, noProcessor; + + public CallStackTestCase(String name) { + super(name); + } + + protected void setUp() { + callStack = new CallStack(); + call1 = new Call(new TestDocumentProcessor()); + processor2 = new TestDocumentProcessor(); + call2 = new Call(processor2); + call3 = new Call(new TestDocumentProcessor()); + callStack.addLast(call1).addLast(call2).addLast(call3); + noProcessor = new TestDocumentProcessor(); + noCall = new Call(noProcessor); + newCall = new Call(new TestDocumentProcessor()); + + insert1 = new Call(new TestDocumentProcessor()); + insert2 = new Call(new TestDocumentProcessor()); + insert3 = new Call(new TestDocumentProcessor()); + insertStack = new CallStack(); + insertStack.addLast(insert1).addLast(insert2).addLast(insert3); + } + + public void testFind() { + assertSame(call2, callStack.findCall(processor2)); + assertSame(call2, callStack.findCall(processor2.getId())); + assertNull(callStack.findCall(noProcessor)); + assertNull(callStack.findCall(noProcessor.getId())); + assertNull(callStack.findCall(new TestDocumentProcessor())); + } + + public void testAddBefore() { + callStack.addBefore(call2, newCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(newCall, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackBefore() { + callStack.addBefore(call2, insertStack); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddAfter() { + callStack.addAfter(call2, newCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(newCall, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackAfter() { + callStack.addAfter(call2, insertStack); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddBeforeFirst() { + callStack.addBefore(call1, newCall); + Iterator i = callStack.iterator(); + assertEquals(newCall, i.next()); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackBeforeFirst() { + callStack.addBefore(call1, insertStack); + Iterator i = callStack.iterator(); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddAfterLast() { + callStack.addAfter(call3, newCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(newCall, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackAfterLast() { + callStack.addAfter(call3, insertStack); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddBeforeNonExisting() { + callStack.addBefore(noCall, newCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(newCall, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackBeforeNonExisting() { + callStack.addBefore(noCall, insertStack); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddAfterNonExisting() { + callStack.addAfter(noCall, newCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(newCall, i.next()); + assertFalse(i.hasNext()); + } + + public void testAddStackAfterNonExisting() { + callStack.addAfter(noCall, insertStack); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertEquals(insert1, i.next()); + assertEquals(insert2, i.next()); + assertEquals(insert3, i.next()); + assertFalse(i.hasNext()); + } + + public void testRemove() { + callStack.remove(call1); + Iterator i = callStack.iterator(); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testRemoveNonExisting() { + callStack.remove(noCall); + Iterator i = callStack.iterator(); + assertEquals(call1, i.next()); + assertEquals(call2, i.next()); + assertEquals(call3, i.next()); + assertFalse(i.hasNext()); + } + + public void testContains() { + callStack.addLast(newCall); + assertTrue(callStack.contains(call1)); + assertTrue(callStack.contains(call2)); + assertTrue(callStack.contains(call3)); + assertTrue(callStack.contains(newCall)); + assertFalse(callStack.contains(noCall)); + } + + public void testPop() { + assertEquals(call1, callStack.pop()); + assertEquals(call2, callStack.pop()); + callStack.addNext(newCall); + + assertFalse(callStack.contains(call1)); + assertFalse(callStack.contains(call2)); + assertTrue(callStack.contains(call3)); + assertTrue(callStack.contains(newCall)); + + assertEquals(newCall, callStack.pop()); + assertTrue(callStack.contains(call3)); + assertFalse(callStack.contains(newCall)); + + assertEquals(call3, callStack.pop()); + assertFalse(callStack.contains(call3)); + + assertNull(callStack.pop()); + } + + public void testGetLastPopped() { + CallStack stakk = new CallStack(); + assertNull(stakk.getLastPopped()); + + Call call; + Call lastCall; + + call = callStack.pop(); + assertEquals(call1, call); + lastCall = callStack.getLastPopped(); + assertEquals(call1, lastCall); + assertEquals(call, lastCall); + + call = callStack.pop(); + assertEquals(call2, call); + lastCall = callStack.getLastPopped(); + assertEquals(call2, lastCall); + assertEquals(call, lastCall); + + call = callStack.pop(); + assertEquals(call3, call); + lastCall = callStack.getLastPopped(); + assertEquals(call3, lastCall); + assertEquals(call, lastCall); + + lastCall = callStack.getLastPopped(); + assertEquals(call3, lastCall); + assertEquals(call, lastCall); + + lastCall = callStack.getLastPopped(); + assertEquals(call3, lastCall); + assertEquals(call, lastCall); + + callStack.addLast(call1); + callStack.addLast(call2); + + lastCall = callStack.getLastPopped(); + assertEquals(call3, lastCall); + assertEquals(call, lastCall); + + call = callStack.pop(); + assertEquals(call1, call); + lastCall = callStack.getLastPopped(); + assertEquals(call1, lastCall); + assertEquals(call, lastCall); + + call = callStack.pop(); + assertEquals(call2, call); + lastCall = callStack.getLastPopped(); + assertEquals(call2, lastCall); + assertEquals(call, lastCall); + + lastCall = callStack.getLastPopped(); + assertEquals(call2, lastCall); + assertEquals(call, lastCall); + + lastCall = callStack.getLastPopped(); + assertEquals(call2, lastCall); + assertEquals(call, lastCall); + } + + private static class TestDocumentProcessor extends com.yahoo.docproc.SimpleDocumentProcessor { + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java new file mode 100644 index 00000000000..c51682b77c8 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.StringFieldValue; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class CallbackTestCase extends junit.framework.TestCase { + private DocumentPut put1; + private DocumentPut put2; + private List<DocumentOperation> operations = new ArrayList<>(2); + DocprocService service; + + + public void setUp() { + service = new DocprocService("callback"); + service.setCallStack(new CallStack().addNext(new TestCallbackDp())); + service.setInService(true); + + // Create documents + DocumentType type = new DocumentType("test"); + type.addField("status", DataType.STRING); + put1 = new DocumentPut(type, new DocumentId("doc:callback:test:1")); + put2 = new DocumentPut(type, new DocumentId("doc:callback:test:2")); + operations.add(new DocumentPut(type, new DocumentId("doc:callback:test:3"))); + operations.add(new DocumentPut(type, new DocumentId("doc:callback:test:4"))); + } + + public void testProcessingWithCallbackSingleDoc() { + ProcessingEndpoint drecv = new TestProcessingEndpoint(); + + service.process(put1, drecv); + while (service.doWork()) { } + + assertEquals(new StringFieldValue("received"), put1.getDocument().getFieldValue("status")); + } + + public void testProcessingWithCallbackMultipleDocs() { + ProcessingEndpoint drecv = new TestProcessingEndpoint(); + + service.process(toProcessing(operations), drecv); + while (service.doWork()) { } + + assertEquals(new StringFieldValue("received"), ((DocumentPut) operations.get(0)).getDocument().getFieldValue("status")); + assertEquals(new StringFieldValue("received"), ((DocumentPut) operations.get(1)).getDocument().getFieldValue("status")); + } + + private Processing toProcessing(List<DocumentOperation> documentOperations) { + Processing processing = new Processing(); + for (DocumentOperation op : documentOperations) + processing.addDocumentOperation(op); + return processing; + } + + public void testProcessingWithCallbackProcessing() { + ProcessingEndpoint drecv = new TestProcessingEndpoint(); + + Processing processing = new Processing("default", put2, service.getCallStack()); + + service.process(processing, drecv); + while (service.doWork()) { } + + assertEquals(new StringFieldValue("received"), put2.getDocument().getFieldValue("status")); + } + + public class TestProcessingEndpoint implements ProcessingEndpoint { + public void processingDone(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + ((DocumentPut)op).getDocument().setFieldValue("status", new StringFieldValue("received")); + } + } + + public void processingFailed(Processing processing, Exception exception) { + //do nothing here for now + } + } + + public static class TestCallbackDp extends com.yahoo.docproc.SimpleDocumentProcessor { + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java new file mode 100644 index 00000000000..cce3460d3c4 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java @@ -0,0 +1,86 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.StringFieldValue; + +/** + * Convenience superclass of document processor test cases + * + * @author bratseth + */ +public abstract class DocumentProcessingAbstractTestCase extends junit.framework.TestCase { + + public DocumentProcessingAbstractTestCase(String name) { + super(name); + } + + /** + * Asserts that a document processing service works + */ + protected void assertProcessingWorks(DocprocService service) { + // Create documents + DocumentType type = new DocumentType("test"); + type.addField("test", DataType.STRING); + DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:test:test:1")); + DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:test:test:2")); + DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:test:test:3")); + + // Process them + service.process(put1); + service.process(put2); + service.process(put3); + while (service.doWork()) {} + + // Verify + assertEquals(new StringFieldValue("done"), put1.getDocument().getFieldValue("test")); + assertEquals(new StringFieldValue("done"), put2.getDocument().getFieldValue("test")); + assertEquals(new StringFieldValue("done"), put3.getDocument().getFieldValue("test")); + } + + public static class TestDocumentProcessor1 extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + if (processing.getVariable("processor1") == null) { + processing.setVariable("processor1", "called"); + return Progress.LATER; + } + processing.setVariable("processor1", "calledTwice"); + } + return Progress.DONE; + } + } + + public static class TestDocumentProcessor2 extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + assertEquals("calledTwice", processing.getVariable("processor1")); + processing.setVariable("processor2", "called"); + } + return Progress.DONE; + } + } + + public static class TestDocumentProcessor3 extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + assertEquals("called", processing.getVariable("processor2")); + if (processing.getVariable("processor3") == null) { + processing.setVariable("processor3", "called"); + return Progress.LATER; + } + ((DocumentPut)op).getDocument().setFieldValue("test", new StringFieldValue("done")); + } + return Progress.DONE; + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java new file mode 100644 index 00000000000..060f3a9747c --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import org.junit.Test; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class EmptyProcessingTestCase { + + @Test + public void emptyProcessing() { + DocprocService service = new DocprocService("juba"); + DocumentProcessor processor = new IncrementingDocumentProcessor(); + CallStack stack = new CallStack("juba"); + stack.addLast(processor); + service.setCallStack(stack); + service.setInService(true); + + Processing proc = new Processing(); + proc.setServiceName("juba"); + + service.process(proc); + + while (service.doWork()) { } + + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java new file mode 100644 index 00000000000..1b7a005350e --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java @@ -0,0 +1,88 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.StringFieldValue; + +/** + * Tests a document processing where some processings fail with an exception + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a> + */ +public class FailingDocumentProcessingTestCase extends junit.framework.TestCase { + + public FailingDocumentProcessingTestCase(String name) { + super(name); + } + + /** + * Tests chaining of some processors, and execution of the processors + * on some documents + */ + public void testFailingProcessing() { + // Set up service programmatically + DocprocService service = new DocprocService("failing"); + DocumentProcessor first = new SettingValueProcessor("done 1"); + DocumentProcessor second = new FailingProcessor("done 2"); + DocumentProcessor third = new SettingValueProcessor("done 3"); + service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third)); + service.setInService(true); + + assertProcessingWorks(service); + } + + protected void assertProcessingWorks(DocprocService service) { + // Create documents + DocumentType type = new DocumentType("test"); + type.addField("test", DataType.STRING); + DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:failing:test:1")); + DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:failing:test:2")); + DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:failing:test:3")); + + // Process them + service.process(put1); + service.process(put2); + service.process(put3); + while (service.doWork()) {} + + // Verify + assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test")); + assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to exception in 2 + assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test")); + } + + public static class SettingValueProcessor extends SimpleDocumentProcessor { + + private String value; + + public SettingValueProcessor(String value) { + this.value = value; + } + + @Override + public void process(DocumentPut put) { + put.getDocument().setFieldValue("test", value); + } + } + + public static class FailingProcessor extends SettingValueProcessor { + + public FailingProcessor(String name) { + super(name); + } + + @Override + public void process(DocumentPut put) { + super.process(put); + if (put.getId().toString().equals("doc:failing:test:2")) { + throw new HandledProcessingException("Failed at receiving document test:2"); + } + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java new file mode 100644 index 00000000000..1db47b0dfb2 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.StringFieldValue; + +/** + * Tests a document processing where some processings fail with an exception + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M. R. Rosenvinge</a> + */ +public class FailingDocumentProcessingWithoutExceptionTestCase extends junit.framework.TestCase { + + public FailingDocumentProcessingWithoutExceptionTestCase(String name) { + super(name); + } + + /** + * Tests chaining of some processors, and execution of the processors + * on some documents + */ + public void testFailingProcessing() { + // Set up service programmatically + DocprocService service = new DocprocService("failing-no-exception"); + DocumentProcessor first = new SettingValueProcessor("done 1"); + DocumentProcessor second = new FailingProcessor("done 2"); + DocumentProcessor third = new SettingValueProcessor("done 3"); + service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third)); + service.setInService(true); + + assertProcessingWorks(service); + } + + protected void assertProcessingWorks(DocprocService service) { + // Create documents + DocumentType type = new DocumentType("test"); + type.addField("test", DataType.STRING); + DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:woexception:test:1")); + DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:woexception:test:2")); + DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:woexception:test:3")); + + // Process them + service.process(put1); + service.process(put2); + service.process(put3); + while (service.doWork()) {} + + // Verify + assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test")); + assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to PROCESSING_FAILED in 2 + assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test")); + } + + public static class SettingValueProcessor extends DocumentProcessor { + + private String value; + + public SettingValueProcessor(String value) { + this.value = value; + } + + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + ((DocumentPut)op).getDocument().setFieldValue("test", value); + } + return Progress.DONE; + } + } + + public static class FailingProcessor extends SettingValueProcessor { + + public FailingProcessor(String name) { + super(name); + } + + @Override + public Progress process(Processing processing) { + super.process(processing); + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op.getId().toString().equals("doc:woexception:test:2")) { + return DocumentProcessor.Progress.FAILED; + } + } + return DocumentProcessor.Progress.DONE; + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java new file mode 100644 index 00000000000..0e84ba04647 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java @@ -0,0 +1,101 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.datatypes.StringFieldValue; + +/** + * Tests a document processing where some processings fail permanently + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M. R. Rosenvinge</a> + */ +public class FailingPermanentlyDocumentProcessingTestCase extends junit.framework.TestCase { + + public FailingPermanentlyDocumentProcessingTestCase(String name) { + super(name); + } + + /** + * Tests chaining of some processors, and execution of the processors + * on some documents + */ + public void testFailingProcessing() { + // Set up service programmatically + DocprocService service = new DocprocService("failing-permanently"); + DocumentProcessor first = new SettingValueProcessor("done 1"); + DocumentProcessor second = new FailingProcessor("done 2"); + DocumentProcessor third = new SettingValueProcessor("done 3"); + service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third)); + service.setInService(true); + + assertProcessingWorks(service); + } + + protected void assertProcessingWorks(DocprocService service) { + // Create documents + DocumentType type = new DocumentType("test"); + type.addField("test", DataType.STRING); + DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:1")); + DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:2")); + DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:3")); + + // Process them + service.process(put1); + service.process(put2); + service.process(put3); + while (service.doWork()) {} + + // Verify + assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test")); + assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to PERMANENT_FAILURE in 2 + assertNull(put3.getDocument().getFieldValue("test")); //service is disabled now + assertFalse(service.isInService()); + + service.setInService(true); + while (service.doWork()) {} + + assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test")); + assertTrue(service.isInService()); + } + + public static class SettingValueProcessor extends DocumentProcessor { + + private String value; + + public SettingValueProcessor(String value) { + this.value = value; + } + + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + ((DocumentPut)op).getDocument().setFieldValue("test", value); + } + return Progress.DONE; + } + } + + public static class FailingProcessor extends SettingValueProcessor { + + public FailingProcessor(String name) { + super(name); + } + + @Override + public Progress process(Processing processing) { + super.process(processing); + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op.getId().toString().equals("doc:permanentfailure:test:2")) { + return DocumentProcessor.Progress.PERMANENT_FAILURE; + } + } + return DocumentProcessor.Progress.DONE; + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java new file mode 100644 index 00000000000..184b0aa9e88 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class FailingWithErrorTestCase extends junit.framework.TestCase { + + public void testErrors() { + DocprocService service = new DocprocService("failing"); + DocumentProcessor first = new ErrorThrowingProcessor(); + service.setCallStack(new CallStack().addLast(first)); + service.setInService(true); + + DocumentType type = new DocumentType("test"); + type.addField("test", DataType.STRING); + DocumentPut put = new DocumentPut(type, new DocumentId("doc:failing:test:1")); + put.getDocument().setFieldValue("test", "foobar"); + + service.process(put); + assertEquals(1, service.getQueueSize()); + try { + while (service.doWork()) { } + fail("Should have gotten OOME here"); + } catch (Throwable t) { + //we don't want a finally block in doWork()! + assertEquals(0, service.getQueueSize()); + } + assertEquals(0, service.getQueueSize()); + + } + + private class ErrorThrowingProcessor extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + throw new OutOfMemoryError("Einar is out of mem"); + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java b/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java new file mode 100644 index 00000000000..660daec67d8 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DocumentPut; + +/** + * Document processor used for a simple, silly test. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class IncrementingDocumentProcessor extends SimpleDocumentProcessor { + int counter = 0; + + @Override + public void process(DocumentPut put) { + System.err.println(counter + " DocumentPut: " + put); + counter++; + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java new file mode 100644 index 00000000000..9e5f7a43c74 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class NotAcceptingNewProcessingsTestCase extends junit.framework.TestCase { + + public void testNotAccepting() { + DocprocService service = new DocprocService("habla"); + service.setCallStack(new CallStack()); + service.setInService(true); + + service.process(new Processing()); + assertEquals(1, service.getQueueSize()); + + service.setAcceptingNewProcessings(false); + + try { + service.process(new Processing()); + fail("Should have gotten IllegalStateException here"); + } catch (IllegalStateException ise) { + //ok! + } + assertEquals(1, service.getQueueSize()); + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java new file mode 100644 index 00000000000..296c8d163e5 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DocumentOperation; +import org.junit.Test; + +import java.util.Iterator; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + * @since 5.1.10 + */ +public class ProcessingTestCase { + + @Test + public void serviceName() { + assertThat(new Processing().getServiceName(), is("default")); + assertThat(new Processing("foobar", (DocumentOperation) null, null).getServiceName(), is("foobar")); + } + + @Test + public void contextVariables() { + Processing p = new Processing(); + + p.setVariable("foo", "banana"); + p.setVariable("bar", "apple"); + + Iterator<Map.Entry<String, Object>> it = p.getVariableAndNameIterator(); + assertThat(it.hasNext(), is(true)); + assertThat(it.next(), not(nullValue())); + assertThat(it.hasNext(), is(true)); + assertThat(it.next(), not(nullValue())); + assertThat(it.hasNext(), is(false)); + + assertThat(p.hasVariable("foo"), is(true)); + assertThat(p.hasVariable("bar"), is(true)); + assertThat(p.hasVariable("baz"), is(false)); + + assertThat(p.removeVariable("foo"), is("banana")); + + p.clearVariables(); + + it = p.getVariableAndNameIterator(); + assertThat(it.hasNext(), is(false)); + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java new file mode 100644 index 00000000000..d8398f1fe47 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java @@ -0,0 +1,103 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.Field; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.update.AssignValueUpdate; +import com.yahoo.document.update.FieldUpdate; +import com.yahoo.document.update.ValueUpdate; + +import java.util.List; +import java.util.StringTokenizer; + +/** + * Simple test case for testing that processing of both documents and + * document updates works. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class ProcessingUpdateTestCase extends junit.framework.TestCase { + + private DocumentPut put; + private DocumentUpdate update; + + private DocumentTypeManager dtm; + + public void testProcessingUpdates() { + DocumentType articleType = new DocumentType("article"); + articleType.addField(new Field("body", DataType.STRING, true)); + articleType.addField(new Field("title", DataType.STRING, true)); + dtm = new DocumentTypeManager(); + dtm.registerDocumentType(articleType); + + put = new DocumentPut(articleType, "doc:banana:apple"); + put.getDocument().setFieldValue("body", "this is the body of the article, blah blah blah"); + FieldUpdate upd = FieldUpdate.createAssign(articleType.getField("body"), new StringFieldValue("this is the updated body of the article, blahdi blahdi blahdi")); + update = new DocumentUpdate(articleType, new DocumentId("doc:grape:orange")); + update.addFieldUpdate(upd); + + DocprocService service = new DocprocService("update"); + DocumentProcessor firstP = new TitleDocumentProcessor(); + service.setCallStack(new CallStack().addLast(firstP)); + service.setInService(true); + + + + Processing p = new Processing(); + p.addDocumentOperation(put); + p.addDocumentOperation(update); + + service.process(p); + + while (service.doWork()) { } + + List<DocumentOperation> operations = p.getDocumentOperations(); + Document first = ((DocumentPut)operations.get(0)).getDocument(); + assertEquals(new StringFieldValue("this is the body of the article, blah blah blah"), first.getFieldValue("body")); + assertEquals(new StringFieldValue("body blah blah blah "), first.getFieldValue("title")); + + DocumentUpdate second = (DocumentUpdate) operations.get(1); + FieldUpdate firstUpd = second.getFieldUpdate(0); + assertEquals(ValueUpdate.ValueUpdateClassID.ASSIGN, firstUpd.getValueUpdate(0).getValueUpdateClassID()); + assertEquals(new StringFieldValue("this is the updated body of the article, blahdi blahdi blahdi"), firstUpd.getValueUpdate(0) + .getValue()); + + FieldUpdate secondUpd = second.getFieldUpdate(1); + assertEquals(ValueUpdate.ValueUpdateClassID.ASSIGN, secondUpd.getValueUpdate(0).getValueUpdateClassID()); + assertEquals(new StringFieldValue("body blahdi blahdi blahdi "), secondUpd.getValueUpdate(0).getValue()); + } + + private class TitleDocumentProcessor extends SimpleDocumentProcessor { + @Override + public void process(DocumentPut doc) { + put.getDocument().setFieldValue("title", extractTitle(put.getDocument().getFieldValue("body").toString())); + } + + @Override + public void process(DocumentUpdate upd) { + FieldUpdate bodyFieldUpdate = upd.getFieldUpdate("body"); + AssignValueUpdate au = (AssignValueUpdate) bodyFieldUpdate.getValueUpdate(0); + FieldUpdate titleUpd = FieldUpdate.createAssign(upd.getType().getField("title"), new StringFieldValue(extractTitle(((StringFieldValue) au.getValue()).getString()))); + upd.addFieldUpdate(titleUpd); + } + + private String extractTitle(String body) { + if (body == null) return null; + StringTokenizer strTok = new StringTokenizer(body, " "); + String title = ""; + while (strTok.hasMoreTokens()) { + String word = strTok.nextToken(); + if (word.startsWith("b")) title += word + " "; + } + return title; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java new file mode 100644 index 00000000000..8ca553b9290 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java @@ -0,0 +1,62 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.component.chain.dependencies.After; +import com.yahoo.docproc.Accesses.Field.Tree; + +/** + * Tests the basic operation of the docproc service + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a> + */ +public class SimpleDocumentProcessingTestCase extends DocumentProcessingAbstractTestCase { + + public SimpleDocumentProcessingTestCase(String name) { + super(name); + } + + /** + * Tests chaining of some processors, and execution of the processors + * on some documents + */ + public void testSimpleProcessing() { + // Set up service programmatically + DocprocService service = new DocprocService("simple"); + DocumentProcessor first = new TestDocumentProcessor1(); + DocumentProcessor second = new TestDocumentProcessor2(); + DocumentProcessor third = new TestDocumentProcessor3(); + service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third)); + service.setInService(true); + + assertProcessingWorks(service); + } + + public void testAnnotationBasic() { + Accesses accesses = MyDocProc.class.getAnnotation(Accesses.class); + After after = MyDocProc.class.getAnnotation(After.class); + assertNotNull(accesses); + assertNotNull(after); + assertEquals(after.value()[0], "MyOtherDocProc"); + assertEquals(after.value()[1], "AnotherDocProc"); + assertEquals(accesses.value()[0].name(), "myField1"); + assertEquals(accesses.value()[1].annotations()[0].consumes()[0], "word"); + assertEquals(accesses.value()[1].annotations()[0].consumes()[1], "phrase"); + } + + @Accesses({ @Accesses.Field(name = "myField1", dataType = "string", + description = "What is done on field myField1", + annotations = @Tree(produces = { "word", "sentence" }, consumes = "word")), + @Accesses.Field(name = "myField2", dataType = "string", + description = "What is done on field myField2", + annotations = @Tree(consumes = { "word", "phrase" })) }) + @After({ "MyOtherDocProc", "AnotherDocProc" }) + public static class MyDocProc extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + // TODO Auto-generated method stub + return null; + } + } +} + diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java new file mode 100644 index 00000000000..aae98dd22e7 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java @@ -0,0 +1,152 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.container.StatisticsConfig; +import com.yahoo.docproc.jdisc.metric.NullMetric; +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentRemove; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.idstring.UserDocIdString; +import com.yahoo.statistics.StatisticsImpl; +import org.junit.Test; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertThat; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class SimpleDocumentProcessorTestCase { + + private static DocprocService setupDocprocService(SimpleDocumentProcessor processor) { + CallStack stack = new CallStack("default", new StatisticsImpl(new StatisticsConfig(new StatisticsConfig.Builder())), new NullMetric()); + stack.addLast(processor); + DocprocService service = new DocprocService("default"); + service.setCallStack(stack); + service.setInService(true); + return service; + } + + private static Processing getProcessing(DocumentOperation... operations) { + Processing processing = new Processing(); + + for (DocumentOperation op : operations) { + processing.addDocumentOperation(op); + } + + return processing; + } + + @Test + public void requireThatProcessingMultipleOperationsWork() { + DocumentType type = new DocumentType("foobar"); + type.addField("title", DataType.STRING); + + Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document"), + new DocumentUpdate(type, "doc:this:is:an:update"), + new DocumentRemove(new DocumentId("doc:this:is:a:remove"))); + + DocprocService service = setupDocprocService(new VerySimpleDocumentProcessor()); + service.getExecutor().process(p); + + assertThat(p.getDocumentOperations().size(), is(3)); + assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true)); + assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(), + is("processed")); + assertThat(p.getDocumentOperations().get(1) instanceof DocumentUpdate, is(true)); + assertThat(p.getDocumentOperations().get(2) instanceof DocumentRemove, is(true)); + assertThat(p.getDocumentOperations().get(2).getId().toString(), + is("userdoc:foobar:1234:something")); + } + + @Test + public void requireThatProcessingSingleOperationWorks() { + DocumentType type = new DocumentType("foobar"); + type.addField("title", DataType.STRING); + + Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document")); + DocprocService service = setupDocprocService(new VerySimpleDocumentProcessor()); + service.getExecutor().process(p); + + assertThat(p.getDocumentOperations().size(), is(1)); + assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true)); + assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(), + is("processed")); + } + + @Test + public void requireThatThrowingTerminatesIteration() { + DocumentType type = new DocumentType("foobar"); + type.addField("title", DataType.STRING); + + Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document"), + new DocumentRemove(new DocumentId("doc:this:is:a:remove")), + new DocumentPut(type, "doc:this:is:a:document2")); + + DocprocService service = setupDocprocService(new SimpleDocumentProcessorThrowingOnRemovesAndUpdates()); + try { + service.getExecutor().process(p); + } catch (RuntimeException re) { + //ok + } + + assertThat(p.getDocumentOperations().size(), is(3)); + assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true)); + assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(), + is("processed")); + assertThat(p.getDocumentOperations().get(1) instanceof DocumentRemove, is(true)); + assertThat(p.getDocumentOperations().get(1).getId().toString(), + is("doc:this:is:a:remove")); + assertThat(p.getDocumentOperations().get(2) instanceof DocumentPut, is(true)); + assertThat(((DocumentPut) p.getDocumentOperations().get(2)).getDocument().getFieldValue("title"), + nullValue()); + + + } + + public static class VerySimpleDocumentProcessor extends SimpleDocumentProcessor { + + @Override + public void process(DocumentPut put) { + put.getDocument().setFieldValue("title", new StringFieldValue("processed")); + } + + @Override + public void process(DocumentRemove remove) { + remove.getId().setId(new UserDocIdString("foobar", 1234L, "something")); + } + + @Override + public void process(DocumentUpdate update) { + update.clearFieldUpdates(); + } + + } + + public static class SimpleDocumentProcessorThrowingOnRemovesAndUpdates extends SimpleDocumentProcessor { + + @Override + public void process(DocumentPut put) { + put.getDocument().setFieldValue("title", new StringFieldValue("processed")); + } + + @Override + public void process(DocumentRemove remove) { + throw new RuntimeException("oh no."); + } + + @Override + public void process(DocumentUpdate update) { + throw new RuntimeException("oh no."); + } + + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java new file mode 100644 index 00000000000..076a8610c4a --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java @@ -0,0 +1,97 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc; + +import com.yahoo.document.DataType; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentOperation; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class TransientFailureTestCase extends junit.framework.TestCase { + DocumentType type; + + public void setUp() { + type = new DocumentType("test"); + type.addField("boo", DataType.STRING); + } + + public void testTransientFailures() { + DocprocService service = new DocprocService("transfail"); + CallStack stack = new CallStack(); + stack.addNext(new OkDocProc()).addNext(new TransientFailDocProc()); + service.setCallStack(stack); + service.setInService(true); + + EndpointSupportingTransientFailures endpoint = new EndpointSupportingTransientFailures(); + + DocumentPut put; + + put = new DocumentPut(type, new DocumentId("doc:transfail:bad")); + service.process(put, endpoint); + while (service.doWork()) { } + assertEquals(0, endpoint.numOk); + assertEquals(1, endpoint.numTransientFail); + assertEquals(0, endpoint.numFail); + + put = new DocumentPut(type, new DocumentId("doc:transfail:verybad")); + service.process(put, endpoint); + while (service.doWork()) { } + assertEquals(0, endpoint.numOk); + assertEquals(1, endpoint.numTransientFail); + assertEquals(1, endpoint.numFail); + + put = new DocumentPut(type, new DocumentId("doc:transfail:good")); + service.process(put, endpoint); + while (service.doWork()) { } + assertEquals(1, endpoint.numOk); + assertEquals(1, endpoint.numTransientFail); + assertEquals(1, endpoint.numFail); + + put = new DocumentPut(type, new DocumentId("doc:transfail:veryverybad")); + service.process(put, endpoint); + while (service.doWork()) { } + assertEquals(1, endpoint.numOk); + assertEquals(1, endpoint.numTransientFail); + assertEquals(2, endpoint.numFail); + } + + private static class OkDocProc extends SimpleDocumentProcessor { + } + + private static class TransientFailDocProc extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op.getId().toString().equals("doc:transfail:bad")) { + throw new TransientFailureException("sorry, try again later"); + } else if (op.getId().toString().equals("doc:transfail:verybad")) { + return Progress.FAILED; + } else if (op.getId().toString().equals("doc:transfail:veryverybad")) { + return Progress.PERMANENT_FAILURE; + } + } + return Progress.DONE; + } + } + + private static class EndpointSupportingTransientFailures implements ProcessingEndpoint { + private volatile int numOk = 0; + private volatile int numTransientFail = 0; + private volatile int numFail = 0; + + public void processingDone(Processing processing) { + numOk++; + } + + public void processingFailed(Processing processing, Exception exception) { + if (exception instanceof TransientFailureException) { + numTransientFail++; + } else { + numFail++; + } + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java new file mode 100644 index 00000000000..f928ec4febd --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java @@ -0,0 +1,83 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocprocThreadPoolExecutorTestCase { + private final Set<Long> threadIds = Collections.synchronizedSet(new HashSet<Long>()); + + @Test + public void threadPool() throws InterruptedException { + int numThreads = 8; + int numTasks = 200; + + LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<>(); + DocprocThreadManager mgr = new DocprocThreadManager(1000l); + DocprocThreadPoolExecutor pool = new DocprocThreadPoolExecutor(numThreads, q, mgr); + + List<MockedDocumentProcessingTask> tasks = new ArrayList<>(numTasks); + for (int i = 0; i < numTasks; i++) { + tasks.add(new MockedDocumentProcessingTask()); + } + + for (int i = 0; i < numTasks; i++) { + pool.execute(tasks.get(i)); + } + pool.shutdown(); + pool.awaitTermination(120L, TimeUnit.SECONDS); + + for (int i = 0; i < numTasks; i++) { + assertTrue(tasks.get(i).hasBeenRun()); + } + + System.err.println(threadIds); + assertEquals(numThreads, threadIds.size()); + } + + private class MockedDocumentProcessingTask extends DocumentProcessingTask { + private boolean hasBeenRun = false; + + public MockedDocumentProcessingTask() { + super(null, null, null); + } + + @Override + public void run() { + threadIds.add(Thread.currentThread().getId()); + System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " running task " + this); + for (int i = 0; i < 100000; i++) { + Math.sin((double) (System.currentTimeMillis() / 10000L)); + } + System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " DONE task " + this); + hasBeenRun = true; + } + + @Override + public int getApproxSize() { + return 333; + } + + @Override + public String toString() { + return "seqNum " + getSeqNum(); + } + + public boolean hasBeenRun() { + return hasBeenRun; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java new file mode 100644 index 00000000000..271ad09ab1d --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java @@ -0,0 +1,230 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Pair; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.*; +import com.yahoo.document.datatypes.FieldValue; +import com.yahoo.document.datatypes.IntegerFieldValue; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.update.FieldUpdate; +import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage; +import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.vdslib.DocumentList; +import com.yahoo.vdslib.Entry; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingHandlerAllMessageTypesTestCase extends DocumentProcessingHandlerTestBase { + + private static final String FOOBAR = "foobar"; + private final DocumentType type; + + public DocumentProcessingHandlerAllMessageTypesTestCase() { + this.type = new DocumentType("baz"); + this.type.addField(new Field("blahblah", DataType.STRING)); + this.type.addField(new Field("defaultWait", DataType.INT)); + this.type.addField(new Field("customWait", DataType.INT)); + } + + @Test + public void testMessages() throws InterruptedException { + get(); + put(); + remove(); + update(); + batchDocumentUpdate(); + } + + private void get() throws InterruptedException { + GetDocumentMessage message = new GetDocumentMessage(new DocumentId("doc:this:is:a:test"), "fieldset?"); + + assertTrue(sendMessage(FOOBAR, message)); + + Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(result); + remoteServer.ackMessage(result); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + + assertThat(result, instanceOf(GetDocumentMessage.class)); + + assertFalse(reply.hasErrors()); + } + + + private void put() throws InterruptedException { + Document document = new Document(getType(), "doc:baz:foo"); + document.setFieldValue("blahblah", new StringFieldValue("This is a test.")); + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(document)); + + assertTrue(sendMessage(FOOBAR, message)); + + Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(result); + remoteServer.ackMessage(result); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + + assertThat(result, instanceOf(PutDocumentMessage.class)); + PutDocumentMessage outputMsg = (PutDocumentMessage) result; + assertThat(((StringFieldValue) outputMsg.getDocumentPut().getDocument().getFieldValue("blahblah")).getString(), is("THIS IS A TEST.")); + + assertFalse(reply.hasErrors()); + } + + private void remove() throws InterruptedException { + RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:12345:6789")); + + assertTrue(sendMessage(FOOBAR, message)); + + Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(result); + remoteServer.ackMessage(result); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + + assertThat(result, instanceOf(RemoveDocumentMessage.class)); + RemoveDocumentMessage outputMsg = (RemoveDocumentMessage) result; + assertThat(outputMsg.getDocumentId().toString(), is("doc:12345:6789")); + + assertFalse(reply.hasErrors()); + } + + private void update() throws InterruptedException { + DocumentUpdate documentUpdate = new DocumentUpdate(getType(), "doc:baz:foo"); + UpdateDocumentMessage message = new UpdateDocumentMessage(documentUpdate); + + assertTrue(sendMessage(FOOBAR, message)); + + Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(result); + remoteServer.ackMessage(result); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + + assertThat(result, instanceOf(UpdateDocumentMessage.class)); + UpdateDocumentMessage outputMsg = (UpdateDocumentMessage) result; + assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:baz:foo")); + + assertFalse(reply.hasErrors()); + } + + private void batchDocumentUpdate() throws InterruptedException { + DocumentUpdate doc1 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:multi:1")); + DocumentUpdate doc2 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:multi:2")); + + Field testField = getType().getField("blahblah"); + doc1.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("1 not yet processed"))); + doc2.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("2 not yet processed"))); + + BatchDocumentUpdateMessage message = new BatchDocumentUpdateMessage(12345); + message.addUpdate(doc1); + message.addUpdate(doc2); + + assertTrue(sendMessage(FOOBAR, message)); + + Message remote1 = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertTrue(remote1 instanceof UpdateDocumentMessage); + remoteServer.ackMessage(remote1); + assertNull(driver.client().awaitReply(100, TimeUnit.MILLISECONDS)); + + Message remote2 = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertTrue(remote2 instanceof UpdateDocumentMessage); + remoteServer.ackMessage(remote2); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + } + + @Override + public List<Pair<String,CallStack>> getCallStacks() { + CallStack stack = new CallStack(); + stack.addLast(new YallaDocumentProcessor()); + stack.addLast(new WaitingDefaultDocumentProcessor()); + stack.addLast(new WaitingCustomDocumentProcessor()); + + ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1); + stacks.add(new Pair<>(FOOBAR, stack)); + return stacks; + } + + @Override + public DocumentType getType() { + return type; + } + + private static class YallaDocumentProcessor extends DocumentProcessor { + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op instanceof DocumentPut) { + Document doc = ((DocumentPut) op).getDocument(); + for (Field f : doc.getDataType().fieldSet()) { + FieldValue val = doc.getFieldValue(f); + if (val instanceof StringFieldValue) { + StringFieldValue sf = (StringFieldValue) val; + doc.setFieldValue(f, new StringFieldValue(sf.getString().toUpperCase())); + } + } + } + //don't touch updates or removes + } + return Progress.DONE; + } + } + + private static abstract class WaitingDocumentProcessor extends DocumentProcessor { + protected Progress laterProgress; + protected String waitKey; + + @Override + public Progress process(Processing processing) { + for (DocumentOperation op : processing.getDocumentOperations()) { + if (op instanceof DocumentPut) { + Document doc = ((DocumentPut) op).getDocument(); + if (doc.getFieldValue(waitKey) == null) { + System.out.println(this.getClass().getSimpleName() + ": returning LATER for " + doc); + doc.setFieldValue(waitKey, new IntegerFieldValue(1)); + return laterProgress; + } + } + //don't touch updates or removes + } + return Progress.DONE; + } + } + + private static class WaitingDefaultDocumentProcessor extends WaitingDocumentProcessor { + private WaitingDefaultDocumentProcessor() { + laterProgress = Progress.LATER; + waitKey = "defaultWait"; + } + } + + private static class WaitingCustomDocumentProcessor extends WaitingDocumentProcessor { + private WaitingCustomDocumentProcessor() { + laterProgress = Progress.later(60); + waitKey = "customWait"; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java new file mode 100644 index 00000000000..d308d0a484a --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java @@ -0,0 +1,79 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Pair; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.SimpleDocumentProcessor; +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.Field; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingHandlerBasicTestCase extends DocumentProcessingHandlerTestBase { + private DocumentType type; + + public DocumentProcessingHandlerBasicTestCase() { + this.type = new DocumentType("yalla"); + this.type.addField(new Field("blahblah", DataType.STRING)); + } + + @Test + public void testPut() throws InterruptedException { + Document document = new Document(getType(), "doc:yalla:balla"); + document.setFieldValue("blahblah", new StringFieldValue("This is a test.")); + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(document)); + + assertTrue(sendMessage("foobar", message)); + + Message msg = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(msg); + remoteServer.ackMessage(msg); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + + assertThat((msg instanceof PutDocumentMessage), is(true)); + PutDocumentMessage put = (PutDocumentMessage) msg; + Document outDoc = put.getDocumentPut().getDocument(); + + assertThat(document, equalTo(outDoc)); + assertFalse(reply.hasErrors()); + } + + @Override + public List<Pair<String,CallStack>> getCallStacks() { + CallStack stack = new CallStack(); + stack.addLast(new TestDocumentProcessor()); + + ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1); + stacks.add(new Pair<>("foobar", stack)); + return stacks; + } + + @Override + public DocumentType getType() { + return type; + } + + public static class TestDocumentProcessor extends SimpleDocumentProcessor { + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java new file mode 100644 index 00000000000..b637f35af4f --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java @@ -0,0 +1,215 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Pair; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.*; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.vdslib.DocumentList; +import com.yahoo.vdslib.Entry; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHandlerTestBase { + + private static final String TOMANYALLINSAMEBUCKET = "tomanyallinsamebucket"; + private static final String TOMANYSOMEINSAMEBUCKET = "tomanysomeinsamebucket"; + private static final String TOMANY = "many"; + private static final String TOONE = "toone"; + private static final String TOZERO = "tozero"; + private final DocumentType type; + + public DocumentProcessingHandlerForkTestCase() { + this.type = new DocumentType("baz"); + this.type.addField(new Field("blahblah", DataType.STRING)); + } + + @Override + public DocumentType getType() { + return type; + } + + @Test + public void testMessages() throws InterruptedException { + putToFourPuts(); + putToManyAllInSameBucket(); + putToManySomeInSameBucket(); + putToOne(); + putToZero(); + } + + private void putToManyAllInSameBucket() throws InterruptedException { + assertPutMessages(createPutDocumentMessage(), TOMANYALLINSAMEBUCKET, + "userdoc:123456:11111:foo:er:bra", + "userdoc:123456:11111:foo:trallala", + "userdoc:123456:11111:foo:a"); + } + + private void putToManySomeInSameBucket() throws InterruptedException { + assertPutMessages(createPutDocumentMessage(), TOMANYSOMEINSAMEBUCKET, + "userdoc:123456:7890:bar:er:bra", + "doc:foo:bar:er:ja", + "userdoc:567890:1234:a", + "doc:foo:bar:hahahhaa", + "userdoc:123456:7890:a:a", + "doc:foo:bar:aa", + "userdoc:567890:1234:bar:ala", + "doc:foo:bar:sdfgsaa", + "userdoc:123456:7890:bar:tralsfa", + "doc:foo:bar:dfshaa"); + } + + private void putToFourPuts() throws InterruptedException { + assertPutMessages(createPutDocumentMessage(), TOMANY, + "doc:foo:bar:er:bra", + "doc:foo:bar:er:ja", + "doc:foo:bar:hahahhaa", + "doc:foo:bar:trallala"); + } + + private void putToOne() throws InterruptedException { + assertPutMessages(createPutDocumentMessage(), TOONE, + "doc:baz:bar"); + } + + private void putToZero() throws InterruptedException { + assertTrue(sendMessage(TOZERO, createPutDocumentMessage())); + + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertTrue(reply instanceof WriteDocumentReply); + assertFalse(reply.hasErrors()); + } + + @Override + protected List<Pair<String, CallStack>> getCallStacks() { + ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(5); + stacks.add(new Pair<>(TOMANYALLINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsAllInSameBucketProcessor()))); + stacks.add(new Pair<>(TOMANYSOMEINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsSomeInSameBucketProcessor()))); + stacks.add(new Pair<>(TOMANY, new CallStack().addLast(new OneToManyDocumentsProcessor()))); + stacks.add(new Pair<>(TOONE, new CallStack().addLast(new OneToOneDocumentsProcessor()))); + stacks.add(new Pair<>(TOZERO, new CallStack().addLast(new OneToZeroDocumentsProcessor()))); + return stacks; + } + + protected PutDocumentMessage createPutDocumentMessage() { + Document document = new Document(getType(), "doc:baz:bar"); + document.setFieldValue("blahblah", new StringFieldValue("This is a test.")); + return new PutDocumentMessage(new DocumentPut(document)); + } + + private void assertPutMessages(DocumentMessage msg, String route, String... expected) throws InterruptedException { + msg.getTrace().setLevel(9); + assertTrue(sendMessage(route, msg)); + + String[] actual = new String[expected.length]; + for (int i = 0; i < expected.length; ++i) { + Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertTrue(remoteMsg instanceof PutDocumentMessage); + remoteMsg.getTrace().trace(1, "remoteServer.ack(" + expected[i] + ")"); + remoteServer.ackMessage(remoteMsg); + actual[i] = ((PutDocumentMessage)remoteMsg).getDocumentPut().getDocument().getId().toString(); + } + assertNull(remoteServer.awaitMessage(100, TimeUnit.MILLISECONDS)); + + Arrays.sort(expected); + Arrays.sort(actual); + assertArrayEquals(expected, actual); + + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + String trace = reply.getTrace().toString(); + for (String documentId : expected) { + assertTrue("missing trace for document '" + documentId + "'\n" + trace, + trace.contains("remoteServer.ack(" + documentId + ")")); + } + if (expected.length == 1) { + assertFalse("unexpected fork in trace for single document\n" + trace, + trace.contains("<fork>")); + } else { + assertTrue("missing fork in trace for " + expected.length + " split\n" + trace, + trace.contains("<fork>")); + } + } + + public class OneToOneDocumentsProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + return Progress.DONE; + } + } + + public class OneToManyDocumentsProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + List<DocumentOperation> operations = processing.getDocumentOperations(); + operations.clear(); + operations.add(new DocumentPut(type, "doc:foo:bar:er:bra")); + operations.add(new DocumentPut(type, "doc:foo:bar:er:ja")); + operations.add(new DocumentPut(type, "doc:foo:bar:trallala")); + operations.add(new DocumentPut(type, "doc:foo:bar:hahahhaa")); + return Progress.DONE; + } + } + + public class OneToZeroDocumentsProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + processing.getDocumentOperations().clear(); + return Progress.DONE; + } + } + + public class OneToManyDocumentsSomeInSameBucketProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + List<DocumentOperation> operations = processing.getDocumentOperations(); + operations.clear(); + operations.add(new DocumentPut(type, "userdoc:123456:7890:bar:er:bra")); + operations.add(new DocumentPut(type, "doc:foo:bar:er:ja")); + operations.add(new DocumentPut(type, "userdoc:567890:1234:a")); + operations.add(new DocumentPut(type, "doc:foo:bar:hahahhaa")); + operations.add(new DocumentPut(type, "userdoc:123456:7890:a:a")); + operations.add(new DocumentPut(type, "doc:foo:bar:aa")); + operations.add(new DocumentPut(type, "userdoc:567890:1234:bar:ala")); + operations.add(new DocumentPut(type, "doc:foo:bar:sdfgsaa")); + operations.add(new DocumentPut(type, "userdoc:123456:7890:bar:tralsfa")); + operations.add(new DocumentPut(type, "doc:foo:bar:dfshaa")); + return Progress.DONE; + } + + } + + public class OneToManyDocumentsAllInSameBucketProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + List<DocumentOperation> docs = processing.getDocumentOperations(); + docs.clear(); + docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:er:bra")); + docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:trallala")); + docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:a")); + return Progress.DONE; + } + + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java new file mode 100644 index 00000000000..4af0b148164 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java @@ -0,0 +1,131 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Pair; +import com.yahoo.component.ComponentId; +import com.yahoo.component.provider.ComponentRegistry; +import com.yahoo.container.core.document.ContainerDocumentConfig; +import com.yahoo.container.jdisc.messagebus.MbusServerProvider; +import com.yahoo.container.jdisc.messagebus.SessionCache; +import com.yahoo.docproc.AbstractConcreteDocumentFactory; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocprocService; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext; + +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.messagebus.loadtypes.LoadType; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jdisc.AbstractResource; +import com.yahoo.jdisc.ReferencedResource; +import com.yahoo.jdisc.application.ContainerBuilder; +import com.yahoo.messagebus.Protocol; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.jdisc.MbusClient; +import com.yahoo.messagebus.jdisc.test.RemoteServer; +import com.yahoo.messagebus.jdisc.test.ServerTestDriver; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.messagebus.shared.SharedSourceSession; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public abstract class DocumentProcessingHandlerTestBase { + + protected DocumentProcessingHandler handler; + protected ServerTestDriver driver; + protected RemoteServer remoteServer; + protected DocumentTypeManager documentTypeManager = new DocumentTypeManager(); + SessionCache sessionCache; + private List<MbusServerProvider> serviceProviders = new ArrayList<>(); + + @Before + public void createHandler() { + documentTypeManager.register(getType()); + + Protocol protocol = new DocumentProtocol(documentTypeManager); + + driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol); + + sessionCache = + new SessionCache("raw:", driver.client().slobrokId(), "test", "raw:", null, "raw:", documentTypeManager); + + ContainerBuilder builder = driver.parent().newContainerBuilder(); + ComponentRegistry<DocprocService> registry = new ComponentRegistry<>(); + + handler = new DocumentProcessingHandler(registry, + new ComponentRegistry<>(), + new ComponentRegistry<>(), + new DocumentProcessingHandlerParameters(). + setDocumentTypeManager(documentTypeManager). + setContainerDocumentConfig(new ContainerDocumentConfig(new ContainerDocumentConfig.Builder()))); + builder.serverBindings().bind("mbus://*/*", handler); + + ReferencedResource<SharedSourceSession> sessionRef = sessionCache.retainSource(new SourceSessionParams()); + MbusClient sourceClient = new MbusClient(sessionRef.getResource()); + builder.clientBindings().bind("mbus://*/source", sourceClient); + builder.clientBindings().bind("mbus://*/" + MbusRequestContext.internalNoThrottledSource, sourceClient); + sourceClient.start(); + + List<Pair<String, CallStack>> callStacks = getCallStacks(); + List<AbstractResource> resources = new ArrayList<>(); + for (Pair<String, CallStack> callStackPair : callStacks) { + DocprocService service = new DocprocService(callStackPair.getFirst()); + service.setCallStack(callStackPair.getSecond()); + service.setInService(true); + + ComponentId serviceId = new ComponentId(service.getName()); + registry.register(serviceId, service); + + ComponentId sessionName = ComponentId.fromString("chain." + serviceId); + MbusServerProvider serviceProvider = new MbusServerProvider(sessionName, sessionCache, driver.parent()); + serviceProvider.get().start(); + + serviceProviders.add(serviceProvider); + + MbusClient intermediateClient = new MbusClient(serviceProvider.getSession()); + builder.clientBindings().bind("mbus://*/" + sessionName.stringValue(), intermediateClient); + intermediateClient.start(); + resources.add(intermediateClient); + } + + driver.parent().activateContainer(builder); + sessionRef.getReference().close(); + sourceClient.release(); + + for (AbstractResource resource : resources) { + resource.release(); + } + + remoteServer = RemoteServer.newInstance(driver.client().slobrokId(), "foobar", protocol); + } + + @After + public void destroy() { + for (MbusServerProvider serviceProvider : serviceProviders) { + serviceProvider.deconstruct(); + } + driver.close(); + remoteServer.close(); + } + + protected abstract List<Pair<String, CallStack>> getCallStacks(); + + protected abstract DocumentType getType(); + + public boolean sendMessage(String destinationChainName, DocumentMessage msg) { + msg.setRoute(Route.parse("test/chain." + destinationChainName + " " + remoteServer.connectionSpec())); + msg.setPriority(DocumentProtocol.Priority.HIGH_1); + msg.setLoadType(LoadType.DEFAULT); + msg.getTrace().setLevel(9); + msg.setTimeRemaining(60 * 1000); + return driver.client().sendMessage(msg).isAccepted(); + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java new file mode 100644 index 00000000000..f3b80506021 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java @@ -0,0 +1,237 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.collections.Pair; +import com.yahoo.docproc.CallStack; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.*; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.update.FieldUpdate; +import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.Routable; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.*; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingHandlerTransformingMessagesTestCase extends DocumentProcessingHandlerTestBase { + + private static final String FOOBAR = "foobar"; + private final DocumentType type; + + public DocumentProcessingHandlerTransformingMessagesTestCase() { + type = new DocumentType("foo"); + type.addField("foostring", DataType.STRING); + } + + @Override + public List<Pair<String, CallStack>> getCallStacks() { + CallStack stack = new CallStack(); + stack.addLast(new TransformingDocumentProcessor()); + + ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1); + stacks.add(new Pair<>(FOOBAR, stack)); + return stacks; + } + + @Override + public DocumentType getType() { + return type; + } + + private Routable sendMessageAndGetResult(DocumentMessage message) throws InterruptedException { + assertTrue(sendMessage(FOOBAR, message)); + + Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS); + assertNotNull(result); + remoteServer.ackMessage(result); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertFalse(reply.hasErrors()); + + return result; + } + + @Test + public void testAllMessages() throws InterruptedException { + put(); + remove(); + update(); + batchDocumentUpdate(); + } + + private void put() throws InterruptedException { + { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:put"))); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(PutDocumentMessage.class)); + PutDocumentMessage outputMsg = (PutDocumentMessage)result; + assertThat(outputMsg.getDocumentPut().getDocument().getFieldValue("foostring").toString(), is("banana")); + } + { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:remove"))); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(RemoveDocumentMessage.class)); + RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result; + assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:put:to:remove")); + } + { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:update"))); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(UpdateDocumentMessage.class)); + UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result; + assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:put:to:update")); + } + { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:nothing"))); + assertTrue(sendMessage(FOOBAR, message)); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertThat(reply, instanceOf(DocumentReply.class)); + assertFalse(reply.hasErrors()); + } + } + + private void remove() throws InterruptedException { + { + RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:put")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(PutDocumentMessage.class)); + PutDocumentMessage outputMsg = (PutDocumentMessage)result; + assertThat(outputMsg.getDocumentPut().getDocument().getId().toString(), is("doc:nodocstatus:remove:to:put")); + } + + { + RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:remove")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(RemoveDocumentMessage.class)); + RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result; + assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:remove:to:remove")); + } + { + RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:update")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(UpdateDocumentMessage.class)); + UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result; + assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:remove:to:update")); + } + { + RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:nothing")); + assertTrue(sendMessage(FOOBAR, message)); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertThat(reply, instanceOf(DocumentReply.class)); + assertFalse(reply.hasErrors()); + } + } + + private void update() throws InterruptedException { + { + UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:put")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(PutDocumentMessage.class)); + PutDocumentMessage outputMsg = (PutDocumentMessage)result; + assertThat(outputMsg.getDocumentPut().getDocument().getId().toString(), is("doc:nodocstatus:update:to:put")); + } + + { + UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:remove")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(RemoveDocumentMessage.class)); + RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result; + assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:update:to:remove")); + } + { + UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:update")); + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(UpdateDocumentMessage.class)); + UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result; + assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:update:to:update")); + } + { + UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:nothing")); + assertTrue(sendMessage(FOOBAR, message)); + Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS); + assertNotNull(reply); + assertThat(reply, instanceOf(DocumentReply.class)); + assertFalse(reply.hasErrors()); + } + } + + private void batchDocumentUpdate() throws InterruptedException { + DocumentUpdate doc1 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:batch:nodocstatus:keep:this")); + DocumentUpdate doc2 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:batch:nodocstatus:skip:this")); + + Field testField = getType().getField("foostring"); + doc1.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("1 not yet processed"))); + doc2.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("2 not yet processed"))); + + BatchDocumentUpdateMessage message = new BatchDocumentUpdateMessage(12345); + message.addUpdate(doc1); + message.addUpdate(doc2); + + Routable result = sendMessageAndGetResult(message); + assertThat(result, instanceOf(UpdateDocumentMessage.class)); + DocumentUpdate outputUpd = ((UpdateDocumentMessage)result).getDocumentUpdate(); + assertThat(outputUpd.getId().toString(), is("userdoc:test:12345:batch:nodocstatus:keep:this")); + } + + public class TransformingDocumentProcessor extends DocumentProcessor { + + @Override + public Progress process(Processing processing) { + ListIterator<DocumentOperation> it = processing.getDocumentOperations().listIterator(); + while (it.hasNext()) { + DocumentOperation op = it.next(); + String id = op.getId().toString(); + if ("doc:nodocstatus:put:to:put".equals(id)) { + Document doc = ((DocumentPut)op).getDocument(); + doc.setFieldValue("foostring", new StringFieldValue("banana")); + } else if ("doc:nodocstatus:put:to:remove".equals(id)) { + it.set(new DocumentRemove(new DocumentId(id))); + } else if ("doc:nodocstatus:put:to:update".equals(id)) { + it.set(new DocumentUpdate(getType(), id)); + } else if ("doc:nodocstatus:put:to:nothing".equals(id)) { + it.remove(); + } else if ("doc:nodocstatus:remove:to:put".equals(id)) { + it.set(new DocumentPut(getType(), op.getId())); + } else if ("doc:nodocstatus:remove:to:remove".equals(id)) { + //nada + } else if ("doc:nodocstatus:remove:to:update".equals(id)) { + it.set(new DocumentUpdate(getType(), id)); + } else if ("doc:nodocstatus:remove:to:nothing".equals(id)) { + it.remove(); + } else if ("doc:nodocstatus:update:to:put".equals(id)) { + it.set(new DocumentPut(getType(), op.getId())); + } else if ("doc:nodocstatus:update:to:remove".equals(id)) { + it.set(new DocumentRemove(new DocumentId(id))); + } else if ("doc:nodocstatus:update:to:update".equals(id)) { + //nada + } else if ("doc:nodocstatus:update:to:nothing".equals(id)) { + it.remove(); + } else if ("userdoc:12345:6789:multiop:nodocstatus:keep:this".equals(id)) { + //nada + } else if ("userdoc:12345:6789:multiop:nodocstatus:skip:this".equals(id)) { + it.remove(); + } else if ("userdoc:test:12345:batch:nodocstatus:keep:this".equals(id)) { + //nada + } else if ("userdoc:test:12345:batch:nodocstatus:skip:this".equals(id)) { + it.remove(); + } + } + return Progress.DONE; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java new file mode 100644 index 00000000000..8dc811bece6 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java @@ -0,0 +1,131 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.jdisc; + +import com.yahoo.docproc.Processing; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import org.junit.Test; + +import java.net.URI; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.PriorityBlockingQueue; + +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DocumentProcessingTaskPrioritizationTestCase { + + @Test + public void proritization() { + Queue<DocumentProcessingTask> queue = new PriorityBlockingQueue<>(); + + DocumentProcessingTask highest = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); + DocumentProcessingTask veryhigh = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_HIGH); + DocumentProcessingTask high1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGH_1); + DocumentProcessingTask normal_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); + DocumentProcessingTask low_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOW_1); + DocumentProcessingTask verylow = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_LOW); + DocumentProcessingTask lowest = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOWEST); + + DocumentProcessingTask normal_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); + DocumentProcessingTask normal_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); + DocumentProcessingTask normal_4 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1); + + DocumentProcessingTask highest_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); + DocumentProcessingTask highest_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST); + + + queue.add(highest); + queue.add(veryhigh); + queue.add(high1); + queue.add(normal_1); + queue.add(low_1); + queue.add(verylow); + queue.add(lowest); + + queue.add(normal_2); + queue.add(normal_3); + queue.add(normal_4); + + queue.add(highest_2); + queue.add(highest_3); + + assertThat(queue.poll(), sameInstance(highest)); + assertThat(queue.poll(), sameInstance(highest_2)); + assertThat(queue.poll(), sameInstance(highest_3)); + assertThat(queue.poll(), sameInstance(veryhigh)); + assertThat(queue.poll(), sameInstance(high1)); + assertThat(queue.poll(), sameInstance(normal_1)); + assertThat(queue.poll(), sameInstance(normal_2)); + assertThat(queue.poll(), sameInstance(normal_3)); + assertThat(queue.poll(), sameInstance(normal_4)); + assertThat(queue.poll(), sameInstance(low_1)); + assertThat(queue.poll(), sameInstance(verylow)); + assertThat(queue.poll(), sameInstance(lowest)); + assertThat(queue.poll(), nullValue()); + } + + private class TestDocumentProcessingTask extends DocumentProcessingTask { + private TestDocumentProcessingTask(DocumentProtocol.Priority priority) { + super(new TestRequestContext(priority), null, null); + } + } + + private class TestRequestContext implements RequestContext { + private final DocumentProtocol.Priority priority; + + public TestRequestContext(DocumentProtocol.Priority priority) { + this.priority = priority; + } + + @Override + public List<Processing> getProcessings() { + return null; + } + + @Override + public void skip() { + } + + @Override + public void processingDone(List<Processing> processing) { + } + + @Override + public void processingFailed(ErrorCode error, String msg) { + } + + @Override + public void processingFailed(Exception exception) { + } + + @Override + public int getApproxSize() { + return 0; + } + + @Override + public int getPriority() { + return priority.getValue(); + } + + @Override + public boolean isProcessable() { + return true; + } + + @Override + public URI getUri() { + return null; + } + + @Override + public String getServiceName() { + return null; + } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java b/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java new file mode 100644 index 00000000000..e5d3a4dc137 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java @@ -0,0 +1,566 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.proxy; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import com.yahoo.collections.Pair; +import com.yahoo.config.docproc.SchemamappingConfig; +import com.yahoo.docproc.Accesses; +import com.yahoo.docproc.Accesses.Field; +import com.yahoo.docproc.Call; +import com.yahoo.docproc.DocumentProcessingAbstractTestCase.TestDocumentProcessor1; +import com.yahoo.docproc.DocumentProcessor; +import com.yahoo.docproc.Processing; +import com.yahoo.document.DataType; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentType; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.StructDataType; +import com.yahoo.document.annotation.Annotation; +import com.yahoo.document.annotation.AnnotationType; +import com.yahoo.document.annotation.SpanTree; +import com.yahoo.document.datatypes.Array; +import com.yahoo.document.datatypes.IntegerFieldValue; +import com.yahoo.document.datatypes.StringFieldValue; +import com.yahoo.document.datatypes.Struct; +import com.yahoo.document.datatypes.StructuredFieldValue; +import com.yahoo.document.update.FieldUpdate; + +public class SchemaMappingAndAccessesTest extends junit.framework.TestCase { + + private Document getDoc() { + DocumentType type = new DocumentType("album"); + AnnotationType personType = new AnnotationType("person"); + Annotation person = new Annotation(personType); + type.addField("title", DataType.STRING); + type.addField("artist", DataType.STRING); + type.addField("guitarist", DataType.STRING); + type.addField("year", DataType.INT); + type.addField("labels", DataType.getArray(DataType.STRING)); + Document doc = new Document(type, new DocumentId("doc:map:test:1")); + doc.setFieldValue("title", new StringFieldValue("Black Rock")); + StringFieldValue joe = new StringFieldValue("Joe Bonamassa"); + joe.setSpanTree(new SpanTree("mytree").annotate(person)); + doc.setFieldValue("artist", joe); + doc.setFieldValue("year", new IntegerFieldValue(2010)); + Array<StringFieldValue> labels = new Array<>(type.getField("labels").getDataType()); + labels.add(new StringFieldValue("audun")); + labels.add(new StringFieldValue("tylden")); + doc.setFieldValue("labels", labels); + + StructDataType personStructType = new StructDataType("artist"); + personStructType.addField(new com.yahoo.document.Field("firstname", DataType.STRING)); + personStructType.addField(new com.yahoo.document.Field("lastname", DataType.STRING)); + type.addField("listeners", DataType.getArray(personStructType)); + + Array<Struct> listeners = new Array<>(type.getField("listeners").getDataType()); + + Struct listenerOne = new Struct(personStructType); + listenerOne.setFieldValue("firstname", new StringFieldValue("per")); + listenerOne.setFieldValue("lastname", new StringFieldValue("olsen")); + Struct listenerTwo = new Struct(personStructType); + listenerTwo.setFieldValue("firstname", new StringFieldValue("anders")); + listenerTwo.setFieldValue("lastname", new StringFieldValue("and")); + + listeners.add(listenerOne); + listeners.add(listenerTwo); + + doc.setFieldValue("listeners", listeners); + + return doc; + } + + public void testMappingArrays() { + Document doc = getDoc(); + DocumentProcessor proc = new TestMappingArrayProcessor(); + + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("label", "labels[0]"); + ProxyDocument mapped = new ProxyDocument(proc, doc, fieldMap); + + Processing p = Processing.of(new DocumentPut(mapped)); + proc.process(p); + + assertEquals(2, ((Array<StringFieldValue>) doc.getFieldValue("labels")).size()); + assertEquals(new StringFieldValue("EMI"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(0)); + assertEquals(new StringFieldValue("tylden"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(1)); + + + fieldMap.clear(); + fieldMap.put("label", "labels[2]"); + mapped = new ProxyDocument(proc, doc, fieldMap); + + p = Processing.of(new DocumentPut(mapped)); + try { + proc.process(p); + fail("Should not have worked"); + } catch (IllegalArgumentException iae) { + //ok! + } + assertEquals(2, ((Array<StringFieldValue>) doc.getFieldValue("labels")).size()); + assertEquals(new StringFieldValue("EMI"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(0)); + assertEquals(new StringFieldValue("tylden"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(1)); + } + + public void testMappingStructsInArrays() { + Document doc = getDoc(); + DocumentProcessor proc = new TestMappingStructInArrayProcessor(); + + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("name", "listeners[0].firstname"); + ProxyDocument mapped = new ProxyDocument(proc, doc, fieldMap); + + Processing p = Processing.of(new DocumentPut(mapped)); + proc.process(p); + + assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size()); + assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString())); + assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString())); + assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString())); + assertEquals("and", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")).getString())); + + + fieldMap.clear(); + fieldMap.put("name", "listeners[2].firstname"); + mapped = new ProxyDocument(proc, doc, fieldMap); + + p = Processing.of(new DocumentPut(mapped)); + try { + proc.process(p); + fail("Should not have worked"); + } catch (IllegalArgumentException iae) { + //ok! + } + assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size()); + assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString())); + assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString())); + assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString())); + assertEquals("and", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")).getString())); + + + //test remove: + proc = new TestRemovingMappingStructInArrayProcessor(); + + fieldMap.clear(); + fieldMap.put("name", "listeners[1].lastname"); + mapped = new ProxyDocument(proc, doc, fieldMap); + + p = Processing.of(new DocumentPut(mapped)); + proc.process(p); + + assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size()); + assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString())); + assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString())); + assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString())); + assertNull(((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")); + + + fieldMap.clear(); + fieldMap.put("name", "listeners[2].lastname"); + mapped = new ProxyDocument(proc, doc, fieldMap); + + p = Processing.of(new DocumentPut(mapped)); + try { + proc.process(p); + fail("Should not have worked"); + } catch (IllegalArgumentException iae) { + //ok! + } + assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size()); + assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString())); + assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString())); + assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString())); + assertNull(((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")); + + } + + + public void testMappingSpanTrees() { + Document doc = getDoc(); + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("t", "title"); + fieldMap.put("a", "artist"); + fieldMap.put("g", "guitarist"); + ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap); + Iterator<SpanTree> itSpanTreesDoc = ((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator(); + Iterator<Annotation> itAnnotDoc = itSpanTreesDoc.next().iterator(); + Iterator<SpanTree> itSpanTreesMapped = ((StringFieldValue) mapped.getFieldValue("artist")).getSpanTrees().iterator(); + Iterator<Annotation> itAnnotMapped = itSpanTreesMapped.next().iterator(); + + assertEquals(itAnnotDoc.next().getType().getName(), "person"); + assertFalse(itAnnotDoc.hasNext()); + assertEquals(itAnnotMapped.next().getType().getName(), "person"); + assertFalse(itAnnotMapped.hasNext()); + + AnnotationType guitaristType = new AnnotationType("guitarist"); + Annotation guitarist = new Annotation(guitaristType); + StringFieldValue bona = new StringFieldValue("Bonamassa"); + bona.setSpanTree(new SpanTree("mytree").annotate(guitarist)); + StringFieldValue clapton = new StringFieldValue("Clapton"); + mapped.setFieldValue("a", bona); + mapped.setFieldValue("g", clapton); + + itSpanTreesDoc = ((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator(); + itAnnotDoc = itSpanTreesDoc.next().iterator(); + itSpanTreesMapped = ((StringFieldValue) mapped.getFieldValue("artist")).getSpanTrees().iterator(); + itAnnotMapped = itSpanTreesMapped.next().iterator(); + + assertEquals(itAnnotDoc.next().getType().getName(), "guitarist"); + assertFalse(itAnnotDoc.hasNext()); + assertEquals(itAnnotMapped.next().getType().getName(), "guitarist"); + assertFalse(itAnnotMapped.hasNext()); + + assertSame(((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator().next(), ((StringFieldValue) mapped.getFieldValue("a")).getSpanTrees().iterator().next()); + //assertSame(clapton, mapped.getFieldValue("g")); + //assertSame(bona, mapped.getFieldValue("a")); + } + + public void testMappedDoc() { + Document doc = getDoc(); + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("t", "title"); + fieldMap.put("a", "artist"); + ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap); + //Document mapped=doc; + //mapped.setFieldMap(fieldMap); + assertEquals(new StringFieldValue("Black Rock"), mapped.getFieldValue("t")); + //assertEquals(new StringFieldValue("Black Rock"), proxy.getFieldValue(new com.yahoo.document.Field("t"))); + assertEquals(new StringFieldValue("Joe Bonamassa").getWrappedValue(), mapped.getFieldValue("a").getWrappedValue()); + mapped.setFieldValue("t", new StringFieldValue("The Ballad Of John Henry")); + StringFieldValue bona = new StringFieldValue("Bonamassa"); + mapped.setFieldValue("a", bona); + //mapped.setFieldValue("a", new StringFieldValue("Bonamassa")); + assertEquals(new StringFieldValue("The Ballad Of John Henry"), doc.getFieldValue("title")); + assertEquals(new StringFieldValue("The Ballad Of John Henry"), mapped.getFieldValue("t")); + assertEquals(new StringFieldValue("Bonamassa"), doc.getFieldValue("artist")); + assertEquals(new StringFieldValue("Bonamassa"), mapped.getFieldValue("a")); + mapped.setFieldValue("a", mapped.getFieldValue("a") + "Hughes"); + assertEquals(new StringFieldValue("BonamassaHughes"), mapped.getFieldValue("a")); + // Verify consistency when using string values to manipluate annotation span trees + StringFieldValue unmapped1 = (StringFieldValue) doc.getFieldValue("artist"); + StringFieldValue unmapped2 = (StringFieldValue) doc.getFieldValue("artist"); + assertTrue(unmapped1==unmapped2); + unmapped1.setSpanTree(new SpanTree("test")); + assertEquals(unmapped2.getSpanTree("test").getName(), "test"); + + StringFieldValue mapped1 = (StringFieldValue) mapped.getFieldValue("a"); + mapped1.setSpanTree(new SpanTree("test2")); + StringFieldValue mapped2 = (StringFieldValue) mapped.getFieldValue("a"); + assertTrue(mapped1==mapped2); + assertEquals(mapped2.getSpanTree("test2").getName(), "test2"); + + mapped.removeFieldValue("a"); + assertEquals(mapped.getFieldValue("a"), null); + mapped.removeFieldValue(mapped.getField("t")); + assertEquals(mapped.getFieldValue("t"), null); + mapped.setFieldValue("a", new StringFieldValue("Bonamassa")); + assertEquals(new StringFieldValue("Bonamassa"), doc.getFieldValue("artist")); + mapped.removeFieldValue("a"); + assertEquals(mapped.getFieldValue("a"), null); + } + + public void testMappedDocAPI() { + Document doc = getDoc(); + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("t", "title"); + fieldMap.put("a", "artist"); + ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap); + assertEquals(mapped.getFieldValue("title"), doc.getFieldValue("title")); + assertEquals(mapped.getFieldValue(new com.yahoo.document.Field("title")), doc.getFieldValue((new com.yahoo.document.Field("title")))); + mapped.setFieldValue("title", "foo"); + assertEquals(doc.getFieldValue("title").getWrappedValue(), "foo"); + assertEquals(mapped.getWrappedDocumentOperation().getId().toString(), "doc:map:test:1"); + assertEquals(doc, mapped); + assertEquals(doc.toString(), mapped.toString()); + assertEquals(doc.hashCode(), mapped.hashCode()); + assertEquals(doc.clone(), mapped.clone()); + assertEquals(doc.iterator().hasNext(), mapped.iterator().hasNext()); + assertEquals(doc.getId(), mapped.getId()); + assertEquals(doc.getDataType(), mapped.getDataType()); + mapped.setLastModified(56l); + assertEquals(doc.getLastModified(), (Long)56l); + assertEquals(mapped.getLastModified(), (Long)56l); + mapped.setId(new DocumentId("doc:map:test:2")); + assertEquals(mapped.getId().toString(), "doc:map:test:2"); + assertEquals(doc.getId().toString(), "doc:map:test:2"); + assertEquals(doc.getHeader(), mapped.getHeader()); + assertEquals(doc.getBody(), mapped.getBody()); + assertEquals(doc.getSerializedSize(), mapped.getSerializedSize()); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ByteArrayOutputStream bos2 = new ByteArrayOutputStream(); + mapped.serialize(bos); + doc.serialize(bos2); + assertEquals(bos.toString(), bos2.toString()); + assertEquals(mapped.toXml(), doc.toXml()); + assertEquals(mapped.getFieldCount(), doc.getFieldCount()); + assertTrue(mapped.getDocument()==doc); + + mapped.clear(); + assertNull(mapped.getFieldValue("title")); + assertNull(doc.getFieldValue("title")); + mapped.setDataType(new DocumentType("newType")); + assertEquals(doc.getDataType().getName(), "newType"); + } + + public void testMappedDocUpdateAPI() { + Document doc = getDoc(); + DocumentType type = doc.getDataType(); + DocumentUpdate dud = new DocumentUpdate(type, new DocumentId("doc:map:test:1")); + FieldUpdate assignSingle = FieldUpdate.createAssign(type.getField("title"), new StringFieldValue("something")); + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("t", "title"); + fieldMap.put("a", "artist"); + ProxyDocumentUpdate pup = new ProxyDocumentUpdate(dud, fieldMap); + pup.addFieldUpdate(assignSingle); + assertEquals(pup.getFieldUpdates(), dud.getFieldUpdates()); + assertEquals(pup.getDocumentType(), dud.getDocumentType()); + assertEquals(pup.getFieldUpdate(new com.yahoo.document.Field("title")).size(), 1); + assertEquals(pup.getFieldUpdate(0), dud.getFieldUpdate(0)); + assertEquals(pup.getFieldUpdate("title"), dud.getFieldUpdate("title")); + assertEquals(pup.getId(), dud.getId()); + assertEquals(pup.getType(), dud.getType()); + assertEquals(pup.applyTo(doc), dud); + assertEquals(doc.getFieldValue("title").getWrappedValue(), "something"); + assertEquals(pup, dud); + assertEquals(pup.hashCode(), dud.hashCode()); + assertEquals(pup.toString(), dud.toString()); + assertEquals(pup.size(), dud.size()); + assertEquals(pup.getWrappedDocumentOperation().getId().toString(), "doc:map:test:1"); + } + + public void testMappedDocStruct() { + StructDataType materialsStructType = new StructDataType("materialstype"); + materialsStructType.addField(new com.yahoo.document.Field("ceiling", DataType.STRING)); + materialsStructType.addField(new com.yahoo.document.Field("walls", DataType.STRING)); + + DocumentType docType = new DocumentType("album"); + docType.addField("title", DataType.STRING); + docType.addField("artist", DataType.STRING); + StructDataType storeStructType = new StructDataType("storetype"); + storeStructType.addField(new com.yahoo.document.Field("name", DataType.STRING)); + storeStructType.addField(new com.yahoo.document.Field("city", DataType.STRING)); + storeStructType.addField(new com.yahoo.document.Field("materials", materialsStructType)); + docType.addField("store", storeStructType); + + Document doc = new Document(docType, new DocumentId("doc:map:test:1")); + doc.setFieldValue("title", new StringFieldValue("Black Rock")); + doc.setFieldValue("artist", new StringFieldValue("Joe Bonamassa")); + Struct material = new Struct(materialsStructType); + material.setFieldValue("ceiling", new StringFieldValue("wood")); + material.setFieldValue("walls", new StringFieldValue("brick")); + Struct store = new Struct(storeStructType); + store.setFieldValue("name", new StringFieldValue("Platekompaniet")); + store.setFieldValue("city", new StringFieldValue("Trondheim")); + store.setFieldValue(storeStructType.getField("materials"), material); + doc.setFieldValue(docType.getField("store"), store); + + Map<String, String> fieldMap = new HashMap<>(); + fieldMap.put("t", "title"); + fieldMap.put("c", "store.city"); + fieldMap.put("w", "store.materials.walls"); + ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap); + assertEquals(new StringFieldValue("Trondheim"), mapped.getFieldValue("c")); + assertEquals(new StringFieldValue("Black Rock"), mapped.getFieldValue("t")); + assertEquals(new StringFieldValue("brick"), mapped.getFieldValue("w")); + assertEquals(new StringFieldValue("brick"), material.getFieldValue("walls")); + mapped.setFieldValue("c", new StringFieldValue("Steinkjer")); + mapped.setFieldValue("w", new StringFieldValue("plaster")); + assertEquals(new StringFieldValue("plaster"), mapped.getFieldValue("w")); + assertEquals(new StringFieldValue("plaster"), material.getFieldValue("walls")); + assertEquals(new StringFieldValue("Steinkjer"), store.getFieldValue("city")); + assertEquals(new StringFieldValue("Steinkjer"), mapped.getFieldValue("c")); + assertEquals(new StringFieldValue("Steinkjer"), mapped.getFieldValue("c")); + mapped.setFieldValue("c", new StringFieldValue("Levanger")); + assertEquals(new StringFieldValue("Levanger"), store.getFieldValue("city")); + assertEquals(new StringFieldValue("Levanger"), mapped.getFieldValue("c")); + mapped.setFieldValue("c", mapped.getFieldValue("c") + "Kommune"); + assertEquals(new StringFieldValue("LevangerKommune"), mapped.getFieldValue("c")); + //mapped.set(mapped.getField("c"), mapped.get("c")+"Styre"); + //assertEquals(new StringFieldValue("LevangerKommuneStyre"), mapped.getFieldValue("c")); + } + + public void testSchemaMap() { + SchemaMap map = new SchemaMap(); + map.addMapping("mychain", "com.yahoo.MyDocProc", "mydoctype", "inDoc1", "inProc1"); + map.addMapping("mychain", "com.yahoo.MyDocProc", "mydoctype", "inDoc2", "inProc2"); + Map<Pair<String, String>, String> cMap = map.chainMap("mychain", "com.yahoo.MyDocProc"); + assertEquals("inDoc1", cMap.get(new Pair<>("mydoctype", "inProc1"))); + assertEquals("inDoc2", cMap.get(new Pair<>("mydoctype", "inProc2"))); + assertNull(cMap.get(new Pair<>("invalidtype", "inProc2"))); + Map<Pair<String, String>, String> noMap = map.chainMap("invalidchain", "com.yahoo.MyDocProc"); + Map<Pair<String, String>, String> noMap2 = map.chainMap("mychain", "com.yahoo.MyInvalidDocProc"); + assertTrue(noMap.isEmpty()); + assertTrue(noMap2.isEmpty()); + + DocumentProcessor proc = new TestDocumentProcessor1(); + proc.setFieldMap(cMap); + Map<String, String> dMap = proc.getDocMap("mydoctype"); + assertEquals("inDoc1", dMap.get("inProc1")); + assertEquals("inDoc2", dMap.get("inProc2")); + } + + public void testSchemaMapKey() { + SchemaMap map = new SchemaMap(null); + SchemaMap.SchemaMapKey key1 = map.new SchemaMapKey("chain", "docproc", "doctype", "from"); + SchemaMap.SchemaMapKey key1_1 = map.new SchemaMapKey("chain", "docproc", "doctype", "from"); + SchemaMap.SchemaMapKey key2 = map.new SchemaMapKey("chain", "docproc", "doctype2", "from"); + assertTrue(key1.equals(key1_1)); + assertFalse(key1.equals(key2)); + } + + public void testSchemaMapConfig() { + SchemaMap map = new SchemaMap(null); + SchemamappingConfig.Builder scb = new SchemamappingConfig.Builder(); + scb.fieldmapping(new SchemamappingConfig.Fieldmapping.Builder().chain("mychain").docproc("mydocproc").doctype("mydoctype"). + indocument("myindoc").inprocessor("myinprocessor")); + map.configure(new SchemamappingConfig(scb)); + assertEquals(map.chainMap("mychain", "mydocproc").get(new Pair<>("mydoctype", "myinprocessor")), "myindoc"); + } + + public void testSchemaMapNoDocType() { + SchemaMap map = new SchemaMap(null); + map.addMapping("mychain", "com.yahoo.MyDocProc", null, "inDoc1", "inProc1"); + map.addMapping("mychain", "com.yahoo.MyDocProc", null, "inDoc2", "inProc2"); + Map<Pair<String, String>, String> cMap = map.chainMap("mychain", "com.yahoo.MyDocProc"); + DocumentProcessor proc = new TestDocumentProcessor1(); + proc.setFieldMap(cMap); + Map<String, String> dMap = proc.getDocMap("mydoctype"); + assertEquals("inDoc1", dMap.get("inProc1")); + assertEquals("inDoc2", dMap.get("inProc2")); + } + + public void testProxyAndSecure() { + DocumentProcessor procOK = new TestDPSecure(); + Map<Pair<String, String>, String> fieldMap = new HashMap<>(); + fieldMap.put(new Pair<>("album", "titleMapped"), "title"); + procOK.setFieldMap(fieldMap); + DocumentPut put = new DocumentPut(getDoc()); + Document proxyDoc = new Call(procOK).configDoc(procOK, put).getDocument(); + procOK.process(Processing.of(new DocumentPut(proxyDoc))); + assertEquals(proxyDoc.getFieldValue("title").toString(), "MyTitle MyTitle"); + } + + public void testProxyAndSecureSecureFailing() { + DocumentProcessor procInsecure = new TestDPInsecure(); + Map<Pair<String, String>, String> fieldMap = new HashMap<>(); + fieldMap.put(new Pair<>("album", "titleMapped"), "title"); + procInsecure.setFieldMap(fieldMap); + DocumentPut put = new DocumentPut(getDoc()); + Document doc = new Call(procInsecure).configDoc(procInsecure, put).getDocument(); + try { + procInsecure.process(Processing.of(new DocumentPut(doc))); + fail("Insecure docproc went through"); + } catch (Exception e) { + assertTrue(e.getMessage().matches(".*allowed.*")); + } + //assertEquals(doc.get("title"), "MyTitle"); + } + + /** + * To make it less likely to break schema mapping, we enforce that ProxyDocument does wrap every public + * non-static, non-final method on Document and StructuredFieldValue + */ + public void testVerifyProxyDocumentOverridesEverything() { + List<Method> allPublicFromProxyDocument = new ArrayList<>(); + for (Method m : ProxyDocument.class.getDeclaredMethods()) { + if (Modifier.isPublic(m.getModifiers())) { + allPublicFromProxyDocument.add(m); + } + } + List<Method> allPublicFromDoc = new ArrayList<>(); + for (Method m : Document.class.getDeclaredMethods()) { + if (mustBeOverriddenInProxyDocument(m)) { + allPublicFromDoc.add(m); + } + } + for (Method m : StructuredFieldValue.class.getDeclaredMethods()) { + if (mustBeOverriddenInProxyDocument(m)) { + allPublicFromDoc.add(m); + } + } + + for (Method m : allPublicFromDoc) { + boolean thisOneOk=false; + for (Method pdM : allPublicFromProxyDocument) { + if (sameNameAndParams(m, pdM)) thisOneOk=true; + } + if (!thisOneOk) { + throw new RuntimeException("ProxyDocument must override all public methods from Document. " + + "Missing: '"+m+"'. If the method doesn't need field mapping or @Accesses check, just " + + "override it and delegate the call to 'doc'."); + + } + } + } + + private boolean mustBeOverriddenInProxyDocument(Method m) { + if (!Modifier.isPublic(m.getModifiers())) return false; + if (Modifier.isStatic(m.getModifiers())) return false; + if (Modifier.isFinal(m.getModifiers())) return false; + return true; + } + + private boolean sameNameAndParams(Method m1, Method m2) { + if (!m1.getName().equals(m2.getName())) return false; + if (m1.getParameterTypes().length!=m2.getParameterTypes().length) return false; + for (int i = 0; i<m1.getParameterTypes().length; i++) { + if (!m1.getParameterTypes()[i].equals(m2.getParameterTypes()[i])) return false; + } + return true; + } + + @Accesses(value = { @Field(dataType = "String", description = "", name = "titleMapped") }) + public static class TestDPSecure extends DocumentProcessor { + + public Progress process(Processing processing) { + Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument(); + document.setFieldValue("titleMapped", new StringFieldValue("MyTitle")); + document.setFieldValue("titleMapped", new StringFieldValue(document.getFieldValue("titleMapped").toString() + " MyTitle")); + return Progress.DONE; + } + } + + @Accesses(value = { @Field(dataType = "String", description = "", name = "titleMappedFoo") }) + public static class TestDPInsecure extends DocumentProcessor { + + public Progress process(Processing processing) { + Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument(); + document.setFieldValue("titleMapped", new StringFieldValue("MyTitle")); + document.setFieldValue("titleMapped", new StringFieldValue(document.getFieldValue("titleMapped").toString() + " MyTitle")); + return Progress.DONE; + } + } + + public static class TestMappingArrayProcessor extends DocumentProcessor { + public Progress process(Processing processing) { + Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument(); + document.setFieldValue("label", new StringFieldValue("EMI")); + return Progress.DONE; + } + } + + public static class TestMappingStructInArrayProcessor extends DocumentProcessor { + public Progress process(Processing processing) { + Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();; + document.setFieldValue("name", new StringFieldValue("peter")); + return Progress.DONE; + } + } + + public static class TestRemovingMappingStructInArrayProcessor extends DocumentProcessor { + public Progress process(Processing processing) { + Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();; + document.removeFieldValue("name"); + return Progress.DONE; + } + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java b/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java new file mode 100644 index 00000000000..2d5a61fa005 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java @@ -0,0 +1,85 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.docproc.util; + +import com.yahoo.config.subscription.ConfigGetter; +import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.config.DocumentmanagerConfig; +import com.yahoo.docproc.Processing; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.datatypes.Array; +import com.yahoo.document.datatypes.StringFieldValue; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class SplitterJoinerTestCase { + + @Test + public void testSplitJoin() { + ConfigGetter<SplitterJoinerDocumentProcessorConfig> getter = new ConfigGetter<>(SplitterJoinerDocumentProcessorConfig.class); + ConfigGetter<DocumentmanagerConfig> docManGetter = new ConfigGetter<>(DocumentmanagerConfig.class); + + SplitterJoinerDocumentProcessorConfig cfg = + getter.getConfig("file:src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg"); + DocumentmanagerConfig docManCfg = + docManGetter.getConfig("file:src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg"); + + SplitterDocumentProcessor splitter = new SplitterDocumentProcessor(cfg, docManCfg); + + DocumentTypeManager manager = splitter.manager; + + + /**** Create documents: ****/ + + Document inner1 = new Document(manager.getDocumentType("docindoc"), "doc:inner:number:one"); + inner1.setFieldValue("name", new StringFieldValue("Donald Duck")); + inner1.setFieldValue("content", new StringFieldValue("Lives in Duckburg")); + Document inner2 = new Document(manager.getDocumentType("docindoc"), "doc:inner:number:two"); + inner2.setFieldValue("name", new StringFieldValue("Uncle Scrooge")); + inner2.setFieldValue("content", new StringFieldValue("Lives in Duckburg, too.")); + + Array<Document> innerArray = (Array<Document>) manager.getDocumentType("outerdoc").getField("innerdocuments").getDataType().createFieldValue(); + innerArray.add(inner1); + innerArray.add(inner2); + + Document outer = new Document(manager.getDocumentType("outerdoc"), "doc:outer:the:only:one"); + outer.setFieldValue("innerdocuments", innerArray); + + /**** End create documents ****/ + + + Processing p = Processing.of(new DocumentPut(outer)); + splitter.process(p); + + assertEquals(2, p.getDocumentOperations().size()); + assertThat(((DocumentPut)(p.getDocumentOperations().get(0))).getDocument(), sameInstance(inner1)); + assertThat(((DocumentPut)(p.getDocumentOperations().get(1))).getDocument(), sameInstance(inner2)); + assertThat(((DocumentPut)(p.getVariable(cfg.contextFieldName()))).getDocument(), sameInstance(outer)); + assertThat(outer.getFieldValue("innerdocuments"), sameInstance(innerArray)); + assertTrue(innerArray.isEmpty()); + + + JoinerDocumentProcessor joiner = new JoinerDocumentProcessor(cfg, docManCfg); + + joiner.process(p); + + assertThat(p.getDocumentOperations().size(), equalTo(1)); + assertThat(((DocumentPut)p.getDocumentOperations().get(0)).getDocument(), sameInstance(outer)); + assertThat(p.getVariable(cfg.contextFieldName()), nullValue()); + assertThat(outer.getFieldValue("innerdocuments"), sameInstance(innerArray)); + assertThat(innerArray.size(), equalTo(2)); + assertThat(innerArray.get(0), sameInstance(inner1)); + assertThat(innerArray.get(1), sameInstance(inner2)); + } + +} diff --git a/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd b/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd new file mode 100644 index 00000000000..83bc4254fb2 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +search docindoc { + document docindoc { + field name type string { header } + field content type string { body } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg b/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg new file mode 100644 index 00000000000..3347c3127b5 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg @@ -0,0 +1,42 @@ +enablecompression false +datatype[7] +datatype[0].id -1407012075 +datatype[0].structtype[1] +datatype[0].structtype[0].name "outerdoc.body" +datatype[0].structtype[0].version 0 +datatype[0].structtype[0].field[1] +datatype[0].structtype[0].field[0].datatype -2035324352 +datatype[0].structtype[0].field[0].name "innerdocuments" +datatype[1].id -1686125086 +datatype[1].structtype[1] +datatype[1].structtype[0].name "docindoc.header" +datatype[1].structtype[0].version 0 +datatype[1].structtype[0].field[1] +datatype[1].structtype[0].field[0].datatype 2 +datatype[1].structtype[0].field[0].name "name" +datatype[2].id -2035324352 +datatype[2].arraytype[1] +datatype[2].arraytype[0].datatype 1447635645 +datatype[3].id -2040625920 +datatype[3].structtype[1] +datatype[3].structtype[0].name "outerdoc.header" +datatype[3].structtype[0].version 0 +datatype[4].id 1447635645 +datatype[4].documenttype[1] +datatype[4].documenttype[0].bodystruct 2030224503 +datatype[4].documenttype[0].headerstruct -1686125086 +datatype[4].documenttype[0].name "docindoc" +datatype[4].documenttype[0].version 0 +datatype[5].id 1748635999 +datatype[5].documenttype[1] +datatype[5].documenttype[0].bodystruct -1407012075 +datatype[5].documenttype[0].headerstruct -2040625920 +datatype[5].documenttype[0].name "outerdoc" +datatype[5].documenttype[0].version 0 +datatype[6].id 2030224503 +datatype[6].structtype[1] +datatype[6].structtype[0].name "docindoc.body" +datatype[6].structtype[0].version 0 +datatype[6].structtype[0].field[1] +datatype[6].structtype[0].field[0].datatype 2 +datatype[6].structtype[0].field[0].name "content" diff --git a/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd b/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd new file mode 100644 index 00000000000..dbdf3c32cd5 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +search outerdoc { + document outerdoc { + field innerdocuments type array<docindoc> { body } + } +} diff --git a/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg b/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg new file mode 100644 index 00000000000..16c26f7a6d8 --- /dev/null +++ b/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg @@ -0,0 +1,2 @@ +documentTypeName "outerdoc" +arrayFieldName "innerdocuments" diff --git a/docproc/src/test/vespa-configdef/string.def b/docproc/src/test/vespa-configdef/string.def new file mode 100644 index 00000000000..cfc0ecc578c --- /dev/null +++ b/docproc/src/test/vespa-configdef/string.def @@ -0,0 +1,5 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +version=1 + +namespace=config.docproc +stringVal string default="_default_" |