aboutsummaryrefslogtreecommitdiffstats
path: root/docproc
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /docproc
Publish
Diffstat (limited to 'docproc')
-rw-r--r--docproc/.gitignore7
-rw-r--r--docproc/OWNERS1
-rw-r--r--docproc/README1
-rw-r--r--docproc/pom.xml90
-rw-r--r--docproc/src/main/java/com/yahoo/config/docproc/package-info.java5
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java44
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/Accesses.java55
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/Call.java179
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/CallStack.java402
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java185
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/DocprocService.java373
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java11
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java183
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java15
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/Processing.java286
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java13
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java120
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java14
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore0
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore0
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java58
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java58
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java235
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java169
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java235
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java66
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java235
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java67
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java118
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java56
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java5
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java28
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java5
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/package-info.java7
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java361
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java119
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java131
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java68
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java147
-rw-r--r--docproc/src/main/java/com/yahoo/docproc/util/package-info.java7
-rw-r--r--docproc/src/main/resources/configdefinitions/docproc.def12
-rw-r--r--docproc/src/main/resources/configdefinitions/schemamapping.def20
-rw-r--r--docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def12
-rw-r--r--docproc/src/test/cfg/docproc-chain-arguments.cfg20
-rw-r--r--docproc/src/test/cfg/docproc-chain-empty.cfg6
-rw-r--r--docproc/src/test/cfg/docproc-chain-parameters.cfg24
-rw-r--r--docproc/src/test/cfg/docproc-chain-slow.cfg4
-rw-r--r--docproc/src/test/cfg/docproc-chain.cfg5
-rw-r--r--docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg7
-rw-r--r--docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg4
-rw-r--r--docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg4
-rw-r--r--docproc/src/test/cfg/invalid/docproc-chain-ok.cfg4
-rw-r--r--docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg4
-rw-r--r--docproc/src/test/cfg/messagebus/docproc-chain.cfg2
-rw-r--r--docproc/src/test/cfg/messagebus/docproc.cfg4
-rw-r--r--docproc/src/test/cfg/messagebus/documentmanager.cfg36
-rw-r--r--docproc/src/test/cfg/server/docproc-chain.cfg3
-rw-r--r--docproc/src/test/cfg/server/docproc.cfg4
-rw-r--r--docproc/src/test/cfg/server/documentmanager.cfg36
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java68
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java291
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java90
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java86
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java28
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java88
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java94
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java101
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java47
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java19
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java27
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java50
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java103
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java62
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java152
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java97
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java83
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java230
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java79
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java215
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java131
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java237
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java131
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java566
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java85
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd7
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg42
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd6
-rw-r--r--docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg2
-rw-r--r--docproc/src/test/vespa-configdef/string.def5
89 files changed, 7592 insertions, 0 deletions
diff --git a/docproc/.gitignore b/docproc/.gitignore
new file mode 100644
index 00000000000..245fc81a1b5
--- /dev/null
+++ b/docproc/.gitignore
@@ -0,0 +1,7 @@
+target
+docproc.iml
+.settings
+.classpath
+.project
+/tmp
+/pom.xml.build
diff --git a/docproc/OWNERS b/docproc/OWNERS
new file mode 100644
index 00000000000..31af040f698
--- /dev/null
+++ b/docproc/OWNERS
@@ -0,0 +1 @@
+bratseth
diff --git a/docproc/README b/docproc/README
new file mode 100644
index 00000000000..d2b667c2565
--- /dev/null
+++ b/docproc/README
@@ -0,0 +1 @@
+Vespa Document Processing Framework (Java).
diff --git a/docproc/pom.xml b/docproc/pom.xml
new file mode 100644
index 00000000000..fe7228ba47d
--- /dev/null
+++ b/docproc/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>docproc</artifactId>
+ <packaging>jar</packaging>
+ <version>6-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>provided-dependencies</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>component</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-bundle</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>messagebus-disc</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>container-messagebus</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-class-plugin</artifactId>
+ <version>${project.version}</version>
+ <executions>
+ <execution>
+ <id>config-gen</id>
+ <goals>
+ <goal>config-gen</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>configgen-test-defs</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>config-gen</goal>
+ </goals>
+ <configuration>
+ <defFilesDirectories>src/test/vespa-configdef</defFilesDirectories>
+ <outputDirectory>target/generated-test-sources/vespa-configgen-plugin</outputDirectory>
+ <testConfig>true</testConfig>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/docproc/src/main/java/com/yahoo/config/docproc/package-info.java b/docproc/src/main/java/com/yahoo/config/docproc/package-info.java
new file mode 100644
index 00000000000..3a2b850c609
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/config/docproc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.config.docproc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java
new file mode 100644
index 00000000000..c967eeedd20
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/AbstractConcreteDocumentFactory.java
@@ -0,0 +1,44 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.annotation.Annotation;
+import com.yahoo.document.datatypes.Struct;
+import com.yahoo.document.datatypes.StructuredFieldValue;
+import com.yahoo.yolean.Exceptions;
+
+/**
+ * Subtyped by factory classes for concrete document types. The factory classes are auto-generated
+ * by vespa-documentgen-plugin. This superclass is used to manage the factories in OSGI.
+ * @author vegardh
+ * @since 5.1
+ *
+ */
+public abstract class AbstractConcreteDocumentFactory extends com.yahoo.component.AbstractComponent {
+ public abstract Map<String, Class<? extends Document>> documentTypes();
+ public abstract Map<String, Class<? extends Struct>> structTypes();
+ public abstract Map<String, Class<? extends Annotation>> annotationTypes();
+
+ /**
+ * Used by the docproc framework to get an instance of a concrete document type without resorting to reflection in a bundle
+ *
+ * @return A concrete document instance
+ */
+ public com.yahoo.document.Document getDocumentCopy(java.lang.String type, com.yahoo.document.datatypes.StructuredFieldValue src, com.yahoo.document.DocumentId id) {
+ // Note: This method can't be abstract because it must work with older bundles where the ConcreteDocumentFactory may not implement it.
+ // It is overridden to not use reflection by newer bundles.
+ // The implementation here is not so good in bundles, since it instantiates the doc using reflection.
+ // TODO: for 6.0: make this method abstract and throw away the code below.
+ Class<? extends Document> concreteClass = documentTypes().get(type);
+ try {
+ Constructor<? extends Document> copyCon = concreteClass.getConstructor(StructuredFieldValue.class, DocumentId.class);
+ return copyCon.newInstance(src, id);
+ } catch (InvocationTargetException | NoSuchMethodException | InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException(Exceptions.toMessageString(e), e);
+ }
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/Accesses.java b/docproc/src/main/java/com/yahoo/docproc/Accesses.java
new file mode 100644
index 00000000000..99f9cb0073c
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/Accesses.java
@@ -0,0 +1,55 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Docprocs tagged with this will read and/or write annotations on the given field(s).
+ * @author vegardh
+ *
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Accesses {
+ Field[] value();
+
+ /**
+ * Describes the annotations produced and consumed on one field in a document
+ * @author vegardh
+ *
+ */
+ @Documented
+ @Target(ElementType.TYPE)
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Field {
+ /** The name of the document field */
+ String name();
+ /** The datatype of the field */
+ String dataType();
+ /** The trees of annotations that this docproc accesses on this field */
+ Tree[] annotations() default {};
+ String description();
+
+ /**
+ * Describes the annotations produced and consumed in one tree on a field
+ * @author vegardh
+ *
+ */
+ @Documented
+ @Target(ElementType.TYPE)
+ @Retention(RetentionPolicy.RUNTIME)
+ public @interface Tree {
+ /** The name of the tree */
+ String name() default "";
+ /** The annotation types that this docproc writes in this tree */
+ String[] produces() default {};
+ /** The annotation types that this docproc requires in this tree */
+ String[] consumes() default {};
+ }
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/Call.java b/docproc/src/main/java/com/yahoo/docproc/Call.java
new file mode 100644
index 00000000000..6d5b25b92b5
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/Call.java
@@ -0,0 +1,179 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.docproc.proxy.ProxyDocument;
+import com.yahoo.docproc.proxy.ProxyDocumentUpdate;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.statistics.Counter;
+import com.yahoo.statistics.Statistics;
+
+import java.util.List;
+
+/**
+ * A document processor to call - an item on a {@link com.yahoo.docproc.CallStack}.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class Call implements Cloneable {
+ private final DocumentProcessor processor;
+ private final Counter docCounter;
+ private final String docCounterName;
+ private final Counter procTimeCounter;
+ private final String procTimeCounterName;
+ private final Metric metric;
+
+ public Call(DocumentProcessor processor) {
+ this(processor, Statistics.nullImplementation, new NullMetric());
+ }
+
+ /** Creates a new call to a processor with no arguments.
+ * @param processor the document processor to call */
+ public Call(DocumentProcessor processor, Statistics manager, Metric metric) {
+ this(processor, "", manager, metric);
+ }
+
+ public Call(DocumentProcessor processor, String chainName, Statistics manager, Metric metric) {
+ this.processor = processor;
+ if (chainName == null) {
+ chainName = "";
+ }
+ chainName = chainName.replaceAll("[^\\p{Alnum}]", "_");
+ docCounterName = "docprocessor_" + chainName + "_" +
+ getDocumentProcessorId().stringValue().replaceAll("[^\\p{Alnum}]", "_") + "_documents";
+ procTimeCounterName = "docprocessor_" + chainName + "_" +
+ getDocumentProcessorId().stringValue().replaceAll("[^\\p{Alnum}]", "_") + "_proctime";
+ docCounter = new Counter(docCounterName, manager, false);
+ procTimeCounter = new Counter(procTimeCounterName, manager, false, null, true);
+ this.metric = metric;
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ Call clone = (Call) super.clone();
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Will not happen");
+ }
+ }
+
+ /**
+ * Returns the processor to call.
+ *
+ * @return a reference to the processor to call
+ */
+ public DocumentProcessor getDocumentProcessor() {
+ return processor;
+ }
+
+ /**
+ * Returns the ID of the processor to call.
+ *
+ * @return the ID of the processor to call
+ */
+ public ComponentId getDocumentProcessorId() {
+ return processor.getId();
+ }
+
+ /**
+ * The Document object the proc should work on. Normally the one in arguments, but could be a proxy object
+ * if schema mapping or @Accesses is in effect.
+ *
+ * <p>
+ * public for testing
+ */
+ public DocumentPut configDoc(DocumentProcessor docProc, DocumentPut documentPut) {
+ if (!docProc.getFieldMap().isEmpty() || docProc.hasAnnotations()) {
+ Document document = documentPut.getDocument();
+ document = new ProxyDocument(docProc, document, docProc.getDocMap(document.getDataType().getName()));
+
+ DocumentPut newDocumentPut = new DocumentPut(document);
+ newDocumentPut.setCondition(documentPut.getCondition());
+ documentPut = newDocumentPut;
+ }
+
+ return documentPut;
+ }
+
+ /**
+ * The DocumentUpdate object a processor should work on. The one in args, or schema mapped.
+ *
+ * @return a DocumentUpdate
+ */
+ private DocumentUpdate configDocUpd(DocumentProcessor proc, DocumentUpdate docU) {
+ if (proc.getFieldMap().isEmpty()) return docU;
+ return new ProxyDocumentUpdate(docU, proc.getDocMap(docU.getDocumentType().getName()));
+ }
+
+ private void schemaMapProcessing(Processing processing) {
+ final List<DocumentOperation> documentOperations = processing.getDocumentOperations();
+ for (int i = 0; i < documentOperations.size(); i++) {
+ DocumentOperation op = documentOperations.get(i);
+ if (op instanceof DocumentPut) {
+ documentOperations.set(i, configDoc(processor, (DocumentPut) op));
+ } else if (op instanceof DocumentUpdate) {
+ documentOperations.set(i, configDocUpd(processor, (DocumentUpdate) op));
+ }
+ }
+ }
+
+
+ private void unwrapSchemaMapping(Processing processing) {
+ final List<DocumentOperation> documentOperations = processing.getDocumentOperations();
+
+ for (int i = 0; i < documentOperations.size(); i++) {
+ DocumentOperation documentOperation = documentOperations.get(i);
+
+ if (documentOperation instanceof DocumentPut) {
+ DocumentPut putOperation = (DocumentPut) documentOperation;
+
+ if (putOperation.getDocument() instanceof DocumentOperationWrapper) {
+ DocumentOperationWrapper proxy = (DocumentOperationWrapper) putOperation.getDocument();
+ documentOperations.set(i, new DocumentPut(putOperation, ((DocumentPut)proxy.getWrappedDocumentOperation()).getDocument()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Call the DocumentProcessor of this call.
+ *
+ * @param processing the Processing object to use
+ * @return the progress of the DocumentProcessor that was called
+ */
+ public DocumentProcessor.Progress call(Processing processing) {
+ try {
+ int numDocs = processing.getDocumentOperations().size();
+ schemaMapProcessing(processing);
+ long startTime = SystemTimer.INSTANCE.milliTime();
+ DocumentProcessor.Progress retval = processor.process(processing);
+ incrementProcTime(SystemTimer.INSTANCE.milliTime() - startTime);
+ incrementDocs(numDocs);
+ return retval;
+ } finally {
+ unwrapSchemaMapping(processing);
+ }
+ }
+
+ public String toString() {
+ return "call to class " + processor.getClass().getName() + " (id: " + getDocumentProcessorId() + ")";
+ }
+
+ private void incrementDocs(long increment) {
+ docCounter.increment(increment);
+ metric.add(docCounterName, increment, null);
+ }
+
+ private void incrementProcTime(long increment) {
+ procTimeCounter.increment(increment);
+ metric.add(procTimeCounterName, increment, null);
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/CallStack.java b/docproc/src/main/java/com/yahoo/docproc/CallStack.java
new file mode 100644
index 00000000000..8fbe72577d3
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/CallStack.java
@@ -0,0 +1,402 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.statistics.Statistics;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * <p>
+ * A stack of the processors to call next in this processing. To push which
+ * processor to call next, call addNext, to get and remove the next processor,
+ * call pop.
+ * </p>
+ *
+ * <p>
+ * This is not thread safe.
+ * </p>
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class CallStack {
+
+ /** The name of this stack, or null if it is not named */
+ private String name = null;
+
+ /** The Call objects of this stack */
+ private final List<Call> elements = new java.util.LinkedList<>();
+
+ /** The last element popped from the call stack, if any */
+ private Call lastPopped = null;
+
+ /** Used for creating counters in Call */
+ private final Statistics statistics;
+
+ /** Used for metrics in Call */
+ private final Metric metric;
+
+ public CallStack() {
+ this(null, Statistics.nullImplementation, new NullMetric());
+ }
+
+ public CallStack(String name) {
+ this(name, Statistics.nullImplementation, new NullMetric());
+ }
+
+ /** Creates an empty stack */
+ public CallStack(Statistics statistics, Metric metric) {
+ this(null, statistics, metric);
+ }
+
+ /** Creates an empty stack with a name */
+ public CallStack(final String name, Statistics manager, Metric metric) {
+ this.name = name;
+ this.statistics = manager;
+ this.metric = metric;
+ }
+
+ /**
+ * Creates a stack from another stack (starting at the next of the given
+ * callstack) This does a deep copy of the stack.
+ */
+ public CallStack(final CallStack stackToCopy) {
+ name = stackToCopy.name;
+ for (final Iterator<Call> i = stackToCopy.iterator(); i.hasNext();) {
+ final Call callToCopy = i.next();
+ elements.add((Call) callToCopy.clone());
+ }
+ this.statistics = stackToCopy.statistics;
+ this.metric = stackToCopy.metric;
+ }
+
+ /**
+ * Creates a stack (with a given name) based on a collection of document processors, which are added to the stack
+ * in the iteration order of the collection.
+ *
+ * @param name the name of the stack
+ * @param docprocs the document processors to call
+ */
+ public CallStack(final String name, Collection<DocumentProcessor> docprocs, Statistics manager, Metric metric) {
+ this(name, manager, metric);
+ for (DocumentProcessor docproc : docprocs) {
+ addLast(docproc);
+ }
+ }
+
+ /** Returns the name of this stack, or null if it is not named */
+ public String getName() {
+ return name;
+ }
+
+ /** Sets the name of this stack */
+ public void setName(final String name) {
+ this.name = name;
+ }
+
+ /**
+ * Push an element as the <i>next</i> element on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addNext(final Call call) {
+ elements.add(0, call);
+ return this;
+ }
+
+ /**
+ * Push an element as the <i>next</i> element on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addNext(final DocumentProcessor processor) {
+ return addNext(new Call(processor, name, statistics, metric));
+ }
+
+ /**
+ * Push multiple elements as the <i>next</i> elements on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addNext(final CallStack callStack) {
+ elements.addAll(0, callStack.elements);
+ return this;
+ }
+
+ /**
+ * Adds an element as the <i>last</i> element on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addLast(final Call call) {
+ elements.add(call);
+ return this;
+ }
+
+ /**
+ * Adds an element as the <i>last</i> element on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addLast(final DocumentProcessor processor) {
+ return addLast(new Call(processor, name, statistics, metric));
+ }
+
+ /**
+ * Adds multiple elements as the <i>last</i> elements on this stack
+ *
+ * @return this for convenience
+ */
+ public CallStack addLast(final CallStack callStack) {
+ elements.addAll(callStack.elements);
+ return this;
+ }
+
+ /**
+ * Adds an element just before the first occurence of some other element on
+ * the stack. This can not be called during an iteration.
+ *
+ * @param before
+ * the call to add this before. If this call is not present (the
+ * same object instance), new processor is added as the last
+ * element
+ * @param call the call to add
+ * @return this for convenience
+ */
+ public CallStack addBefore(final Call before, final Call call) {
+ final int insertPosition = elements.indexOf(before);
+ if (insertPosition < 0) {
+ addLast(call);
+ } else {
+ elements.add(insertPosition, call);
+ }
+ return this;
+ }
+
+ /**
+ * Adds an element just before the first occurence of some element on the
+ * stack. This can not be called during an iteration.
+ *
+ * @param before
+ * the call to add this before. If this call is not present (the
+ * same object instance), the new processor is added as the last
+ * element
+ * @param processor the processor to add
+ * @return this for convenience
+ */
+ public CallStack addBefore(final Call before, DocumentProcessor processor) {
+ return addBefore(before, new Call(processor, name, statistics, metric));
+ }
+
+ /**
+ * Adds multiple elements just before the first occurence of some element on
+ * the stack. This can not be called during an iteration.
+ *
+ * @param before
+ * the call to add this before. If this call is not present (the
+ * same object instance), the new processor is added as the last
+ * element
+ * @param callStack
+ * the calls to add
+ * @return this for convenience
+ */
+ public CallStack addBefore(final Call before, final CallStack callStack) {
+ final int insertPosition = elements.indexOf(before);
+ if (insertPosition < 0) {
+ addLast(callStack);
+ } else {
+ elements.addAll(insertPosition, callStack.elements);
+ }
+ return this;
+ }
+
+ /**
+ * Adds an element just after the first occurence of some other element on
+ * the stack. This can not be called during an iteration.
+ *
+ * @param after
+ * the call to add this before. If this call is not present, (the
+ * same object instance), the new processor is added as the last
+ * element
+ * @param call
+ * the call to add
+ * @return this for convenience
+ */
+ public CallStack addAfter(final Call after, final Call call) {
+ final int insertPosition = elements.indexOf(after);
+ if (insertPosition < 0) {
+ addLast(call);
+ } else {
+ elements.add(insertPosition + 1, call);
+ }
+ return this;
+ }
+
+ /**
+ * Adds an element just after the first occurence of some other element on
+ * the stack. This can not be called during an iteration.
+ *
+ * @param after
+ * the call to add this after. If this call is not present, (the
+ * same object instance), the new processor is added as the last
+ * element
+ * @param processor
+ * the processor to add
+ * @return this for convenience
+ */
+ public CallStack addAfter(final Call after, final DocumentProcessor processor) {
+ return addAfter(after, new Call(processor, name, statistics, metric));
+ }
+
+ /**
+ * Adds multiple elements just after another given element on the stack.
+ * This can not be called during an iteration.
+ *
+ * @param after
+ * the call to add this before. If this call is not present, (the
+ * same object instance), the new processor is added as the last
+ * element
+ * @param callStack
+ * the calls to add
+ * @return this for convenience
+ */
+ public CallStack addAfter(final Call after, final CallStack callStack) {
+ final int insertPosition = elements.indexOf(after);
+ if (insertPosition < 0) {
+ addLast(callStack);
+ } else {
+ elements.addAll(insertPosition + 1, callStack.elements);
+ }
+ return this;
+ }
+
+ /**
+ * Removes the given call. Does nothing if the call is not present.
+ *
+ * @param call
+ * the call to remove
+ * @return this for convenience
+ */
+ public CallStack remove(final Call call) {
+ for (final ListIterator<Call> i = iterator(); i.hasNext();) {
+ final Call current = i.next();
+ if (current == call) {
+ i.remove();
+ }
+ }
+ return this;
+ }
+
+ /**
+ * Returns whether this stack has this call (left)
+ *
+ * @param call
+ * the call to check
+ * @return true if the call is present, false otherwise
+ */
+ public boolean contains(final Call call) {
+ for (final ListIterator<Call> i = iterator(); i.hasNext();) {
+ final Call current = i.next();
+ if (current == call) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the next call to this processor id, or null if no such calls are
+ * left
+ */
+ public Call findCall(final ComponentId processorId) {
+ for (final Iterator<Call> i = iterator(); i.hasNext();) {
+ final Call call = i.next();
+ if (call.getDocumentProcessorId().equals(processorId)) {
+ return call;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns the next call to this processor, or null if no such calls are
+ * left
+ */
+ public Call findCall(final DocumentProcessor processor) {
+ return findCall(processor.getId());
+ }
+
+ /**
+ * Returns and removes the next element, or null if there are no more elements
+ */
+ public Call pop() {
+ if (elements.isEmpty()) return null;
+ lastPopped = elements.remove(0);
+ return lastPopped;
+ }
+
+ /**
+ * Returns the next element without removing it, or null if there are no
+ * more elements
+ */
+ public Call peek() {
+ if (elements.isEmpty()) return null;
+ return elements.get(0);
+ }
+
+ /**
+ * Returns the element that was last popped from this stack, or null if none
+ * have been popped or the stack is empty
+ */
+ public Call getLastPopped() {
+ return lastPopped;
+ }
+
+ public void clear() {
+ elements.clear();
+ }
+
+ /**
+ * Returns a modifiable ListIterator over all the remaining elements of this
+ * stack, starting by the next element
+ */
+ public ListIterator<Call> iterator() {
+ return elements.listIterator();
+ }
+
+ /** Returns the number of remainnig elements in this stack */
+ public int size() {
+ return elements.size();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuffer buffer = new StringBuffer();
+ buffer.append("callstack");
+ if (name != null) {
+ buffer.append(" ");
+ buffer.append(name);
+ }
+ buffer.append(":");
+ for (final Iterator<Call> i = iterator(); i.hasNext();) {
+ buffer.append("\n");
+ buffer.append(" ");
+ buffer.append(i.next().toString());
+ }
+ buffer.append("\n");
+ return buffer.toString();
+ }
+
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ public Metric getMetric() {
+ return metric;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java b/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java
new file mode 100644
index 00000000000..225f62ed2ef
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/DocprocExecutor.java
@@ -0,0 +1,185 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.log.LogLevel;
+import com.yahoo.statistics.Counter;
+import com.yahoo.text.Utf8;
+
+import java.util.Collections;
+import java.util.logging.Logger;
+
+/**
+ * An executor executed incoming processings on its CallStack
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocprocExecutor {
+
+ private final static String METRIC_NAME_DOCUMENTS_PROCESSED = "documents_processed";
+
+ private static final Logger log = Logger.getLogger(DocprocExecutor.class.getName());
+
+ private final String name;
+ private final String docCounterName;
+ private final Counter docCounter;
+ private final Metric metric;
+ private Metric.Context context;
+ private final CallStack callStack;
+
+ /**
+ * Creates a new named DocprocExecutor with the given CallStack.
+ *
+ * @param name the name of this executor
+ * @param callStack the chain of document processors this executor shall execute on processings
+ */
+ public DocprocExecutor(String name, CallStack callStack) {
+ this.name = name;
+ String chainDimension = name;
+ if (name != null) {
+ chainDimension = name.replaceAll("[^\\p{Alnum}]", "_");
+ docCounterName = "chain_" + name.replaceAll("[^\\p{Alnum}]", "_") + "_documents";
+ } else {
+ //name is null
+ docCounterName = "chain_" + name + "_documents";
+ }
+ docCounter = new Counter(docCounterName, callStack.getStatistics(), false);
+ this.metric = callStack.getMetric();
+ this.callStack = callStack;
+ this.callStack.setName(name);
+ this.context = this.metric.createContext(Collections.singletonMap("chain", chainDimension));
+ }
+
+ /**
+ * Creates a new named DocprocExecutor, with the same instance variables as the given executor,
+ * but a new call stack.
+ *
+ * @param oldExecutor the executor to inherit the instance variables from, sans call stack.
+ * @param callStack the call stack to use.
+ */
+ public DocprocExecutor(DocprocExecutor oldExecutor, CallStack callStack) {
+ this.name = oldExecutor.name;
+ this.docCounterName = oldExecutor.docCounterName;
+ this.docCounter = oldExecutor.docCounter;
+ this.metric = oldExecutor.metric;
+ this.context = oldExecutor.context;
+ this.callStack = callStack;
+ }
+
+ public CallStack getCallStack() {
+ return callStack;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ private void incrementNumDocsProcessed(int num) {
+ docCounter.increment(num);
+ metric.add(docCounterName, num, null);
+ metric.add(METRIC_NAME_DOCUMENTS_PROCESSED,num,this.context);
+ }
+
+ private void incrementNumDocsProcessed(Processing processing) {
+ int increment = processing.getNumDocsToBeProcessed();
+ if (increment != 0) {
+ incrementNumDocsProcessed(increment);
+ }
+ }
+
+ /**
+ * Processes a given Processing through the CallStack of this executor.
+ *
+ * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null.
+ * @return a Progress; if this is LATER, the Processing is not done and must be reprocessed later.
+ * @throws RuntimeException if a document processor throws an exception during processing.
+ * @see com.yahoo.docproc.Processing
+ */
+ public DocumentProcessor.Progress process(Processing processing) {
+ processing.setServiceName(getName());
+ if (processing.callStack() == null) {
+ processing.setCallStack(new CallStack(getCallStack()));
+ }
+
+ DocumentProcessor.Progress progress = DocumentProcessor.Progress.DONE;
+ //metrics stuff:
+ //TODO: Note that this is *wrong* in case of Progress.LATER, documents are then counted several times until the Processing is DONE or FAILED.
+ incrementNumDocsProcessed(processing);
+ do {
+ Call call = processing.callStack().pop();
+ if (call == null) {
+ // No more processors - done
+ return progress;
+ }
+
+ progress = DocumentProcessor.Progress.DONE;
+ //might throw exception, which is OK:
+ progress = call.call(processing);
+
+ if (log.isLoggable(LogLevel.SPAM)) {
+ logProgress(processing, progress, call);
+ }
+
+ if (DocumentProcessor.Progress.LATER.equals(progress)) {
+ processing.callStack().addNext(call);
+ return progress;
+ }
+ } while (DocumentProcessor.Progress.DONE.equals(progress));
+ return progress;
+ }
+
+ private void logProgress(Processing processing, DocumentProcessor.Progress progress, Call call) {
+ StringBuilder message = new StringBuilder();
+ boolean first = true;
+ message.append(call.getDocumentProcessorId()).append(" of class ")
+ .append(call.getDocumentProcessor().getClass().getSimpleName()).append(" returned ").append(progress)
+ .append(" for the documents: [");
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (first) {
+ first = false;
+ } else {
+ message.append(", ");
+ }
+ if (op instanceof DocumentPut) {
+ message.append(Utf8.toString(JsonWriter.toByteArray(((DocumentPut) op).getDocument())));
+ } else {
+ message.append(op.toString());
+ }
+ }
+ message.append("]");
+ log.log(LogLevel.SPAM, message.toString());
+ }
+
+ /**
+ * Processes a given Processing through the CallStack of this executor. Note that if a DocumentProcessor
+ * returns a LaterProgress for this processing, it will be re-processed (after waiting the specified delay given
+ * by the LaterProgress), until done or failed.
+ *
+ * @param processing the Processing to process. The CallStack of the Processing will be set to a clone of the CallStack of this executor, iff. it is currently null.
+ * @return a Progress; this is never a LaterProgress.
+ * @throws RuntimeException if a document processor throws an exception during processing, or this thread is interrupted while waiting.
+ * @see com.yahoo.docproc.Processing
+ * @see com.yahoo.docproc.DocumentProcessor.Progress
+ * @see com.yahoo.docproc.DocumentProcessor.LaterProgress
+ */
+ public DocumentProcessor.Progress processUntilDone(Processing processing) {
+ DocumentProcessor.Progress progress;
+ while (true) {
+ progress = process(processing);
+ if (!(progress instanceof DocumentProcessor.LaterProgress)) {
+ break;
+ }
+ DocumentProcessor.LaterProgress later = (DocumentProcessor.LaterProgress) progress;
+ try {
+ Thread.sleep(later.getDelay());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ return progress;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/DocprocService.java b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java
new file mode 100644
index 00000000000..3d113fecfc5
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/DocprocService.java
@@ -0,0 +1,373 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.ComponentId;
+import com.yahoo.docproc.proxy.SchemaMap;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentTypeManager;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * <p>The document processing service.
+ * Use this to set up a document processing chain and to
+ * process documents using that chain. Note that there may
+ * be multiple named instances of this service in the same
+ * runtime. The default service is called "default" and is always present.</p>
+ *
+ * <p>To create a server which receives documents from the network
+ * and processes them, have a look at com.yahoo.docproc.server.Server.</p>
+ *
+ * <p>This class is thread safe.</p>
+ *
+ * @author bratseth
+ */
+public class DocprocService extends AbstractComponent {
+
+ private static Logger log = Logger.getLogger(DocprocService.class.getName());
+ private volatile DocprocExecutor executor;
+ /**
+ * The processings currently in progress at this service
+ */
+ private final LinkedBlockingQueue<Processing> queue;
+ /** The current state of this service */
+ private boolean inService = false;
+ /** The current state of this service */
+ private boolean acceptingNewProcessings = true;
+ public static SchemaMap schemaMap = new SchemaMap(null);
+ private DocumentTypeManager documentTypeManager = null;
+
+ public DocprocService(ComponentId id) {
+ super(id);
+ queue = new LinkedBlockingQueue<>();
+ }
+
+ /**
+ * Creates a new docproc service, which is set to be in service.
+ *
+ * @param id the component id of the new service.
+ * @param stack the call stack to use.
+ * @param mgr the document type manager to use.
+ */
+ public DocprocService(ComponentId id, CallStack stack, DocumentTypeManager mgr) {
+ this(id);
+ setCallStack(stack);
+ setDocumentTypeManager(mgr);
+ setInService(true);
+ }
+
+ /**
+ * Creates a service with a name with an unbounded input queue. If the given name is null or the empty string,
+ * it will become the name "default".
+ */
+ public DocprocService(String name) {
+ this(new ComponentId(name, null));
+ }
+
+ public DocumentTypeManager getDocumentTypeManager() {
+ return documentTypeManager;
+ }
+
+ public void setDocumentTypeManager(DocumentTypeManager documentTypeManager) {
+ this.documentTypeManager = documentTypeManager;
+ }
+
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ /**
+ * Returns the DocprocExecutor of this DocprocService. This can be used to
+ * synchronously process one Processing.
+ *
+ * @return the DocprocExecutor of this DocprocService, or null if a CallStack has not yet been set.
+ */
+ public DocprocExecutor getExecutor() {
+ return executor;
+ }
+
+ private void setExecutor(DocprocExecutor executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Sets whether this should currently perform any processing.
+ * New processings will be accepted also when this is out of service,
+ * but no processing will happen when it is out of service.
+ */
+ public void setInService(boolean inService) {
+ this.inService = inService;
+ }
+
+ /**
+ * Returns true if this is currently processing incoming processings
+ * (in service), or false if they are just queued up (out of service).
+ * By default, this is out of service.
+ */
+ public boolean isInService() {
+ return inService;
+ }
+
+ /**
+ * Returns true if this service currently accepts new incoming processings via process(...).&nbsp;Default is true.
+ *
+ * @return true if accepting new incoming processings
+ */
+ public boolean isAcceptingNewProcessings() {
+ return acceptingNewProcessings;
+ }
+
+ /**
+ * Sets whether this service should accept new incoming processings via process(...).
+ *
+ * @param acceptingNewProcessings true if service should accept new incoming processings
+ */
+ public void setAcceptingNewProcessings(boolean acceptingNewProcessings) {
+ this.acceptingNewProcessings = acceptingNewProcessings;
+ }
+
+ public String getName() {
+ return getId().stringValue();
+ }
+
+ /**
+ * Returns the processing chain of this service. This stack can not be modified.
+ * To change the stack, set a new one.
+ * TODO: Enforce unmodifiability
+ */
+ public CallStack getCallStack() {
+ DocprocExecutor ex = getExecutor();
+ return (ex == null) ? null : ex.getCallStack();
+ }
+
+ /**
+ * Sets a new processing stack for this service. This will be the Prototype
+ * for the call stacks of individual processings in this service
+ */
+ public void setCallStack(CallStack stack) {
+ DocprocExecutor ex = ((getExecutor() == null) ? new DocprocExecutor(getName(), stack) : new DocprocExecutor(getExecutor(), stack));
+ setExecutor(ex);
+ }
+
+ /**
+ * Asynchronously process the given Processing using the processing
+ * chain of this service, and call the specified ProcessingEndpoint when done.
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void process(Processing processing, ProcessingEndpoint endp) {
+ processing.setServiceName(getName());
+ processing.setCallStack(new CallStack(getCallStack()));
+ processing.setEndpoint(endp);
+ addProcessing(processing);
+ }
+
+ /**
+ * Asynchronously process the given Processing using the processing
+ * chain of this service
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void process(Processing processing) {
+ process(processing, null);
+ }
+
+ /**
+ * Asynchronously process the given document put or document update using the processing
+ * chain of this service, and call the specified ProcessingEndpoint when done.
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void process(DocumentOperation documentOperation, ProcessingEndpoint endp) {
+ addProcessing(new Processing(getName(), documentOperation, new CallStack(getCallStack()), endp));
+ }
+
+ /**
+ * Asynchronously process the given document operation using the processing
+ * chain of this service.
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void process(DocumentOperation documentOperation) {
+ process(documentOperation, null);
+ }
+
+ /**
+ * Asynchronously process the given document operations as one unit
+ * using the processing chain of this service,
+ * and call the specified ProcessingEndpoint when done.
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void processDocumentOperations(List<DocumentOperation> documentOperations, ProcessingEndpoint endp) {
+ addProcessing(Processing.createProcessingFromDocumentOperations(getName(), documentOperations, new CallStack(getCallStack()), endp));
+ }
+
+ /**
+ * Asynchronously process the given document operations as one unit
+ * using the processing chain of this service.
+ *
+ * @throws RuntimeException caused by a QueueFullException if this DocprocService has a bounded input queue and the queue is full
+ * @throws IllegalStateException if this DocprocService is not accepting new incoming processings
+ */
+ public void processDocumentOperations(List<DocumentOperation> documentOperations) {
+ processDocumentOperations(documentOperations, null);
+ }
+
+ private void addProcessing(Processing processing) {
+ if (!isAcceptingNewProcessings()) {
+ throw new IllegalStateException("Docproc service " + getName() + " is not accepting new incoming processings. Cannot add " + processing + " ");
+ }
+
+ if (!queue.offer(processing)) {
+ throw new RejectedExecutionException("Docproc service " + getName() + " is busy, please try later");
+ }
+ }
+
+ /**
+ * <p>Do some work in this service. This will perform some processing and return
+ * in a "short" while, as long as individual processors also returns.</p>
+ *
+ * <p>This method is thread safe - multiple threads may call doWork at any time.
+ * Note that processors
+ * should be non-blocking, so multiple threads should be used primarily to
+ * utilize multiple processors.</p>
+ *
+ * @return true if some work was performed, false if no work could be performed
+ * at this time, because there are no outstanding processings, or because
+ * this is out of service. Note that since processings may arrive or be put
+ * back by another thread at any time, this return value does not mean
+ * that no work will be available if doWork as called again immediately.
+ */
+ public boolean doWork() {
+ try {
+ return doWork(false);
+ } catch (InterruptedException e) {
+ //will never happen because we are not blocking
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean doWork(boolean blocking) throws InterruptedException {
+ Processing processing;
+ if (blocking) {
+ processing = queue.take();
+ } else {
+ processing = queue.poll();
+ }
+
+ if (processing == null) {
+ //did no work, returning nothing to queue
+ return false;
+ }
+ if (!isInService()) {
+ //did no work, returning processing (because it's not empty)
+ queue.add(processing); //always successful, since queue is unbounded
+ return false;
+ }
+
+ boolean remove = workOn(processing); //NOTE: Exceptions are handled in here, but not Errors
+ if (!remove) {
+ queue.add(processing); //always successful, since queue is unbounded
+ }
+ return true;
+ //NOTE: We *could* have called returnProcessing() in a finally block here, but we don't
+ //want that, since the only thing being thrown out here is Errors, and then the Processing
+ //can just disappear instead
+ }
+
+ /**
+ * <p>Do some work in this service. This will perform some processing and return
+ * in a "short" while, as long as individual processors also returns.&nbsp;Note that
+ * if the internal queue is empty when this method is called, it will block until
+ * some work is submitted by a call to process() by another thread.</p>
+ *
+ * <p>This method is thread safe - multiple threads may call doWorkBlocking at any time.
+ * Note that processors
+ * should be non-blocking, so multiple threads should be used primarily to
+ * utilize multiple processors.</p>
+ *
+ * @return always true, since if the internal queue is empty when this method is
+ * called, it will block until some work is submitted by a call to
+ * process() by another thread.
+ * @throws InterruptedException if a call to this method is interrupted while waiting for data to become available
+ */
+ public boolean doWorkBlocking() throws InterruptedException {
+ return doWork(true);
+ }
+
+ /**
+ * Do some work on this processing. Must only be called from the worker thread.
+ *
+ * @return true if this processing should be removed, false if there is more work to do on it later
+ * @throws NoCallStackException if no CallStack has been set on this executor.
+ */
+ boolean workOn(Processing processing) {
+ DocprocExecutor ex = getExecutor();
+ if (ex == null) {
+ throw new NoCallStackException();
+ }
+
+ DocumentProcessor.Progress progress;
+
+ try {
+ progress = ex.process(processing);
+ } catch (Exception e) {
+ processingFailed(processing, processing + " failed", e);
+ return true;
+ }
+
+ if (DocumentProcessor.Progress.DONE.equals(progress)) {
+ //notify endpoint
+ ProcessingEndpoint recv = processing.getEndpoint();
+ if (recv != null) {
+ recv.processingDone(processing);
+ }
+ return true;
+ } else if (DocumentProcessor.Progress.FAILED.equals(progress)) {
+ processingFailed(processing, processing + " failed at " + processing.callStack().getLastPopped(), null);
+ return true;
+ } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) {
+ processingFailed(processing,
+ processing + " failed PERMANENTLY at " + processing.callStack().getLastPopped() + ", disabling processing service.", null);
+ setInService(false);
+ return true;
+ } else {
+ //LATER:
+ return false;
+ }
+ }
+
+ private void processingFailed(Processing processing, String errorMsg, Exception e) {
+ if (e != null) {
+ if (e instanceof HandledProcessingException) {
+ errorMsg += ". Error message: " + e.getMessage();
+ log.log(Level.WARNING, errorMsg);
+ log.log(Level.FINE, "Chained exception:", e);
+ } else {
+ log.log(Level.WARNING, errorMsg, e);
+ }
+ } else {
+ log.log(Level.WARNING, errorMsg);
+ }
+
+ //notify endpoint
+ ProcessingEndpoint recv = processing.getEndpoint();
+ if (recv != null) {
+ recv.processingFailed(processing, e);
+ }
+ }
+
+ private class NoCallStackException extends RuntimeException {
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java b/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java
new file mode 100644
index 00000000000..560022dca1b
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/DocumentOperationWrapper.java
@@ -0,0 +1,11 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DocumentOperation;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public interface DocumentOperationWrapper {
+ public DocumentOperation getWrappedDocumentOperation();
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java
new file mode 100644
index 00000000000..1ad745ac1b7
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java
@@ -0,0 +1,183 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.Version;
+import com.yahoo.component.chain.ChainedComponent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Logger;
+
+/**
+ * <p>A document processor is a component which performs some
+ * operation on a document or document update. Document processors are asynchronous,
+ * they may request some data and then return. The processing framework
+ * is responsible for calling processors again at unspecified times
+ * until they are done processing the document or document update.</p>
+ *
+ * <p>Document processor instances
+ * are chained together by the framework to realize a complete processing pipeline.
+ * The processing chain is represented by the processor instances themselves, see
+ * getNext/setNext. Document processors may optionally control the routing
+ * through the chain by setting the next processor on ongoing processings.</p>
+ *
+ * <p>A processing may contain one or multiple documents or document updates. Document processors
+ * may optionally handle collections of processors in some other way than just
+ * processing each one in order.</p>
+ *
+ * <p>A document processor <i>must</i> have an empty constructor. When instantiated
+ * from Vespa config (as opposed to being instantiated programmatically in a stand-alone
+ * Docproc system), the framework is responsible for configuring the processor using
+ * setConfig(). If a document processor wants to do some initial setup after configuration
+ * has been set, but before it has begun processing documents or document updates, it should
+ * override initialize(). </p>
+ *
+ * <p>Document processors must be thread safe. To ensure this, make sure that
+ * access to any mutable, thread-unsafe state held in a field by the processor is
+ * synchronized.</p>
+ *
+ * @author <a href="bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ */
+public abstract class DocumentProcessor extends ChainedComponent {
+ static Logger log = Logger.getLogger(DocprocService.class.getName());
+
+ /** The number of instances of this processor */
+ private static int instanceCounter = 1;
+
+ /** Schema map for doctype-fieldnames */
+ private Map<Pair<String,String>, String> fieldMap = new HashMap<>();
+
+ /** For a doc type, the actual field name mapping to do */
+ // TODO how to flush this when reconfig of schemamapping? Must solve
+ private Map<String, Map<String, String>> docMapCache = new HashMap<>();
+
+ final boolean hasAnnotations;
+
+ public DocumentProcessor() {
+ hasAnnotations = getClass().getAnnotation(Accesses.class) != null;
+ }
+
+ final boolean hasAnnotations() {
+ return hasAnnotations;
+ }
+
+ /**
+ * Processes a processing, which can contain zero or more document bases. The implementing document processor
+ * is free to modify, replace or delete elements in the list inside processing.
+ *
+ * @param processing the processing to process
+ * @return the outcome of this processing
+ */
+ public abstract Progress process(Processing processing);
+
+ public String toString() {
+ return "processor " + getId().stringValue();
+ }
+
+ /** An enumeration of possible results of calling a process method */
+ public static class Progress {
+
+ /** Returned by a processor when it is done with a processing */
+ public static final Progress DONE = new Progress("done");
+
+ /**
+ * Returned by a processor when it should be called again later
+ * for the same processing
+ */
+ public static final Progress LATER = new LaterProgress();
+
+ /**
+ * Returned by a processor when a processing has failed
+ * and it should not be called again for this processing.
+ */
+ public static final Progress FAILED = new Progress("failed");
+
+ /**
+ * Returned by a processor when processing has permanently failed,
+ * so that the document processing service should disable itself until
+ * reconfigured or restarted.
+ */
+ public static final Progress PERMANENT_FAILURE = new Progress("permanent_failure");
+
+ private String name;
+
+ protected Progress(String name) {
+ this.name = name;
+ }
+
+ public static Progress later(long delay) {
+ return new LaterProgress(delay);
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public boolean equals(Object object) {
+ return object instanceof Progress && ((Progress) object).name.equals(this.name);
+ }
+
+ public int hashCode() {
+ return name.hashCode();
+ }
+ }
+
+ public static final class LaterProgress extends Progress {
+ private final long delay;
+ public static final long DEFAULT_LATER_DELAY = 20; //ms
+
+ private LaterProgress() {
+ this(DEFAULT_LATER_DELAY);
+ }
+
+ private LaterProgress(long delay) {
+ super("later");
+ this.delay = delay;
+ }
+
+ public long getDelay() {
+ return delay;
+ }
+ }
+
+ /** Sets the schema map for field names */
+ public void setFieldMap(Map<Pair<String, String>, String> fieldMap) {
+ this.fieldMap = fieldMap;
+
+ }
+
+ /** Schema map for field names
+ * (doctype,from)→to
+ *
+ */
+ public Map<Pair<String, String>, String> getFieldMap() {
+ return fieldMap;
+ }
+
+ public Map<String, String> getDocMap(String docType) {
+ Map<String, String> cached = docMapCache.get(docType);
+ if (cached!=null) {
+ return cached;
+ }
+ Map<String, String> ret = new HashMap<>();
+ for (Entry<Pair<String, String>, String> e : fieldMap.entrySet()) {
+ // Remember to include tuple if doctype is unset in mapping
+ if (docType.equals(e.getKey().getFirst()) || e.getKey().getFirst()==null || "".equals(e.getKey().getFirst())) {
+ ret.put(e.getKey().getSecond(), e.getValue());
+ }
+ }
+ docMapCache.put(docType, ret);
+ return ret;
+ }
+
+ private void dumpMap(Map<?, String> m) {
+ System.out.print("[");
+ for (Map.Entry<?, String> e : m.entrySet()) {
+ System.out.print("("+e.getKey()+"->"+e.getValue()+")");
+ }
+ System.out.print("]\n");
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java b/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java
new file mode 100644
index 00000000000..33636cf17ab
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/HandledProcessingException.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+/**
+ * Exception generated by known bad input in a docproc. Will cause only message to be logged,
+ * not stacktrace.
+ *
+ * @author <a href="mailto:mathiasm@yahoo-inc.com">Mathias M\u00F8lster Lidal</a>
+ */
+public class HandledProcessingException extends RuntimeException {
+
+ public HandledProcessingException(String message) {
+ super(message);
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/Processing.java b/docproc/src/main/java/com/yahoo/docproc/Processing.java
new file mode 100644
index 00000000000..4f18a8372d8
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/Processing.java
@@ -0,0 +1,286 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.document.DocumentOperation;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A document processing. This contains the document(s) or document update(s) to process,
+ * a map of processing context data and the processing instance to
+ * invoke the next time any work needs to be done on this processing.
+ *
+ * @author bratseth
+ */
+public class Processing {
+ /**
+ * The name of the service which owns this processing.
+ * Null is the same as "default"
+ */
+ private String service = null;
+
+ /** The processors to call the next work is done on this processing */
+ private CallStack callStack = null;
+
+ /** The collection of documents or document updates processed by this. This is never null */
+ private List<DocumentOperation> documentOperations;
+
+ /**
+ * Documents or document updates which should be added to <code>documents</code> before
+ * the next access, or null if documents or document updates have never been added to
+ * this processing
+ */
+ private List<DocumentOperation> documentsToAdd = null;
+
+ /** The processing context variables */
+ private Map<String, Object> context = null;
+
+ /** The endpoint of this processing. */
+ private ProcessingEndpoint endpoint = null;
+
+ /** The registry of docproc services. */
+ ComponentRegistry<DocprocService> docprocServiceRegistry = null;
+ private boolean getNumDocsCalled = false;
+
+ /**
+ * Create a Processing with no documents. Useful with DocprocService.process(Processing).
+ * Note that the callstack is initially empty when using this constructor (but it is
+ * set by DocprocService.process(Processing).)
+ */
+ public Processing() {
+ this.documentOperations = new ArrayList<>(1);
+ }
+
+ /**
+ * Create a Processing from the given document operation
+ */
+ public static Processing of(DocumentOperation documentOperation) {
+ return new Processing(documentOperation);
+ }
+
+ /**
+ * Create a Processing from the given document operation
+ * @deprecated Use {@link #of(DocumentOperation)} instead
+ */
+ @Deprecated
+ @SuppressWarnings("unused")
+ public static Processing fromDocumentOperation(DocumentOperation documentOperation) {
+ return Processing.of(documentOperation);
+ }
+
+ private Processing(DocumentOperation documentOperation) {
+ this();
+ addDocumentOperation(documentOperation);
+ }
+
+ /**
+ * Create a processing with one document. The given document put or document update will be the single
+ * element in <code>documentOperations</code>.
+ *
+ * @param service the unique name of the service processing this
+ * @param documentOperation document operation (DocumentPut or DocumentUpdate)
+ * @param callStack the document processors to call in this processing.
+ * @param endp the endpoint of this processing
+ */
+ Processing(String service, DocumentOperation documentOperation, CallStack callStack, ProcessingEndpoint endp) {
+ this.service = service;
+ this.documentOperations = new ArrayList<>(1);
+ documentOperations.add(documentOperation);
+ this.callStack = callStack;
+ this.endpoint = endp;
+ }
+
+ /**
+ * Create a processing with one document. The given document put or document update will be the single
+ * element in <code>documentOperations</code>.
+ *
+ * @param service the unique name of the service processing this
+ * @param documentOperation document operation (DocumentPut or DocumentUpdate)
+ * @param callStack the document processors to call in this processing.
+ * This <b>tranfers ownership</b> of this structure
+ * to this class. The caller <i>must not</i> modify it
+ */
+ public Processing(String service, DocumentOperation documentOperation, CallStack callStack) {
+ this(service, documentOperation, callStack, null);
+ }
+
+ @SuppressWarnings("unused")
+ private Processing(String service, List<DocumentOperation> documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp, boolean unused) {
+ this.service = service;
+ this.documentOperations = new ArrayList<>(documentOpsAndUpdates.size());
+ documentOperations.addAll(documentOpsAndUpdates);
+ this.callStack = callStack;
+ this.endpoint = endp;
+ }
+
+ static Processing createProcessingFromDocumentOperations(String service, List<DocumentOperation> documentOpsAndUpdates, CallStack callStack, ProcessingEndpoint endp) {
+ return new Processing(service, documentOpsAndUpdates, callStack, endp, false);
+ }
+
+ /**
+ *
+ * @param service the unique name of the service processing this
+ * @param documentsAndUpdates the document operation list. This <b>transfers ownership</b> of this list
+ * to this class. The caller <i>must not</i> modify it
+ * @param callStack the document processors to call in this processing.
+ * This <b>transfers ownership</b> of this structure
+ * to this class. The caller <i>must not</i> modify it
+ */
+ public static Processing createProcessingFromDocumentOperations(String service, List<DocumentOperation> documentsAndUpdates, CallStack callStack) {
+ return new Processing(service, documentsAndUpdates, callStack, null, false);
+ }
+
+ public ComponentRegistry<DocprocService> getDocprocServiceRegistry() {
+ return docprocServiceRegistry;
+ }
+
+ public void setDocprocServiceRegistry(ComponentRegistry<DocprocService> docprocServiceRegistry) {
+ this.docprocServiceRegistry = docprocServiceRegistry;
+ }
+
+ /** Returns the name of the service processing this. This will never return null */
+ public String getServiceName() {
+ if (service == null) return "default";
+ return service;
+ }
+
+ /** Sets the name of the service processing this. */
+ public void setServiceName(String service) {
+ this.service = service;
+ }
+
+ /**
+ * Convenience method for looking up and returning the service processing this. This might return null
+ * if #getServiceName returns a name that is not registered in {@link com.yahoo.docproc.DocprocService}.
+ *
+ * @return the service processing this, or null if unknown.
+ */
+ public DocprocService getService() {
+ if (docprocServiceRegistry != null) {
+ return docprocServiceRegistry.getComponent(getServiceName());
+ }
+ return null;
+ }
+
+ /** Returns a context variable, or null if it is not set */
+ public Object getVariable(String name) {
+ if (context == null) return null;
+ return context.get(name);
+ }
+
+ /**
+ * Returns an iterator of all context variables that are set
+ *
+ * @return an iterator over objects of type Map.Entry
+ */
+ public Iterator<Map.Entry<String, Object>> getVariableAndNameIterator() {
+ if (context == null) context = new HashMap<>();
+ return context.entrySet().iterator();
+ }
+
+ /** Clears all context variables that have been set */
+ public void clearVariables() {
+ if (context == null) return;
+ context.clear();
+ }
+
+ /** Sets a context variable. */
+ public void setVariable(String name, Object value) {
+ if (context == null) context = new java.util.HashMap<>();
+ context.put(name, value);
+ }
+
+ public Object removeVariable(String name) {
+ if (context == null) return null;
+ return context.remove(name);
+ }
+
+ /** Returns true if this variable is present, even if it is null */
+ public boolean hasVariable(String name) {
+ return context != null && context.containsKey(name);
+ }
+
+ /**
+ * Returns the ProcessingEndpoint that is called when this Processing is complete, if any.
+ *
+ * @return the ProcessingEndpoint, or null
+ */
+ ProcessingEndpoint getEndpoint() {
+ return endpoint;
+ }
+
+ /**
+ * Sets the ProcessingEndpoint to be called when this Processing is complete.
+ *
+ * @param endpoint the ProcessingEndpoint to use
+ */
+ void setEndpoint(ProcessingEndpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void addDocumentOperation(DocumentOperation documentOperation) {
+ if (documentsToAdd == null) documentsToAdd = new ArrayList<>(1);
+ documentsToAdd.add(documentOperation);
+ }
+
+ private void updateDocumentOperations() {
+ if (documentsToAdd != null) {
+ documentOperations.addAll(documentsToAdd);
+ documentsToAdd.clear();
+ }
+ }
+
+ public List<DocumentOperation> getDocumentOperations() {
+ updateDocumentOperations();
+ return documentOperations;
+ }
+
+ /** Returns the processors to call in this processing */
+ public CallStack callStack() {
+ return callStack;
+ }
+
+ /**
+ * Package-private method to set the callstack of this processing. Only to be used
+ * by DocprocService.process(Processing).
+ *
+ * @param callStack the callstack to set
+ */
+ void setCallStack(CallStack callStack) {
+ this.callStack = callStack;
+ }
+
+ public String toString() {
+ String previousCall = "";
+ if (callStack != null) {
+ Call call = callStack.getLastPopped();
+ if (call != null) {
+ previousCall = "Last call: " + call;
+ }
+ }
+ if (documentOperations.size() == 1) {
+ return "Processing of " + documentOperations.get(0) + ". " + previousCall;
+ } else {
+ String listString = documentOperations.toString();
+ if (listString.length() > 100) {
+ listString = listString.substring(0, 99);
+ listString += "...]";
+ }
+
+ return "Processing of " + listString + ". " + previousCall;
+ }
+ }
+
+ int getNumDocsToBeProcessed() {
+ if (getNumDocsCalled) {
+ return 0;
+ }
+ getNumDocsCalled = true;
+ return getDocumentOperations().size();
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java b/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java
new file mode 100644
index 00000000000..2159147a476
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/ProcessingEndpoint.java
@@ -0,0 +1,13 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public interface ProcessingEndpoint {
+
+ public void processingDone(Processing processing);
+
+ public void processingFailed(Processing processing, Exception exception);
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java
new file mode 100644
index 00000000000..16a207171cb
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java
@@ -0,0 +1,120 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.log.LogLevel;
+
+/**
+ * <p>Simple layer on top of {@link DocumentProcessor}, in order to make docproc
+ * development more user friendly and to the point.</p>
+ *
+ * <p>This simply iterates over the {@link DocumentOperation}s in {@link Processing#getDocumentOperations}, and calls
+ * the appropriate process() method given by this class.</p>
+ *
+ * <p>Note that more sophisticated use cases should subclass {@link DocumentProcessor} instead. Specifically,
+ * it is not possible to return a {@link DocumentProcessor.LaterProgress} from any of the process() methods that SimpleDocumentProcessor
+ * provides - since their return type is void.</p>
+ *
+ * <p>SimpleDocumentProcessor is for the <em>simple</em> cases. For complete control over document processing,
+ * like returning instances of {@link DocumentProcessor.LaterProgress}, subclass {@link DocumentProcessor} instead.</p>
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ */
+public class SimpleDocumentProcessor extends DocumentProcessor {
+ /**
+ * Override this to process the Document inside a DocumentPut.
+ * @deprecated use process(DocumentPut)
+ *
+ * @param document the Document to process.
+ */
+ public void process(Document document) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Ignored " + document);
+ }
+ }
+
+ /**
+ * Override this to process DocumentPuts. If this method is not overridden, the implementation in this class
+ * will ignore DocumentPuts (passing them through un-processed). If processing of this DocumentPut fails, the
+ * implementation must throw a {@link RuntimeException}.
+ *
+ * @param put the DocumentPut to process.
+ */
+ public void process(DocumentPut put) {
+ process(put.getDocument());
+ }
+
+ /**
+ * Override this to process DocumentUpdates. If this method is not overridden, the implementation in this class
+ * will ignore DocumentUpdates (passing them through un-processed). If processing of this DocumentUpdate fails, the
+ * implementation must throw a {@link RuntimeException}.
+ *
+ * @param update the DocumentUpdate to process.
+ */
+ public void process(DocumentUpdate update) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Ignored " + update);
+ }
+ }
+
+ /**
+ * Override this to process DocumentRemoves. If this method is not overridden, the implementation in this class
+ * will ignore DocumentRemoves (passing them through un-processed). If processing of this DocumentRemove fails, the
+ * implementation must throw a {@link RuntimeException}.
+ *
+ * @param remove the DocumentRemove to process.
+ */
+ public void process(DocumentRemove remove) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Ignored " + remove);
+ }
+ }
+
+ /**
+ * Simple process() that follows the official guidelines for
+ * looping over {@link DocumentOperation}s, and then calls the appropriate,
+ * overloaded process() depending on the type of base.
+ * <p>
+ * Declared as final, so if you want to handle everything yourself
+ * you should of course extend DocumentProcessor instead of
+ * SimpleDocumentProcessor and just go about as usual.
+ * <p>
+ * It is important to note that when iterating over the {@link DocumentOperation}s in
+ * {@link com.yahoo.docproc.Processing#getDocumentOperations()}, an exception thrown
+ * from any of the process() methods provided by this class will be thrown straight
+ * out of this here. This means that failing one document will fail the
+ * entire batch.
+ *
+ * @param processing the Processing to process.
+ * @return Progress.DONE, unless a subclass decides to throw an exception
+ */
+ @Override
+ public final Progress process(Processing processing) {
+ final int initialSize = processing.getDocumentOperations().size();
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ try {
+ if (op instanceof DocumentPut) {
+ process((DocumentPut) op);
+ } else if (op instanceof DocumentUpdate) {
+ process((DocumentUpdate) op);
+ } else if (op instanceof DocumentRemove) {
+ process((DocumentRemove) op);
+ }
+ } catch (RuntimeException e) {
+ if (log.isLoggable(LogLevel.DEBUG) && initialSize != 1) {
+ log.log(LogLevel.DEBUG,
+ "Processing of document failed, from processing.getDocumentOperations() containing " +
+ initialSize + " DocumentOperation(s).", e);
+ }
+ throw e;
+ }
+ }
+
+ return Progress.DONE;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java b/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java
new file mode 100644
index 00000000000..a0e49bb4cb2
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/TransientFailureException.java
@@ -0,0 +1,14 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+/**
+ * Exception to be thrown by a document processor on transient failures.&nbsp;Caller
+ * is welcome to try the call again later.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class TransientFailureException extends RuntimeException {
+ public TransientFailureException(String s) {
+ super(s);
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore b/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/configuration/.gitignore
diff --git a/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore b/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/documentstatus/.gitignore
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
new file mode 100644
index 00000000000..134896e3b0c
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadManager.java
@@ -0,0 +1,58 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.document.DocumentUtil;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.log.LogLevel;
+import com.yahoo.statistics.*;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+class DocprocThreadManager {
+ private static Logger log = Logger.getLogger(DocprocThreadManager.class.getName());
+
+ private final long maxConcurrentByteSize;
+ private final AtomicLong bytesStarted = new AtomicLong(0);
+ private final AtomicLong bytesFinished = new AtomicLong(0);
+
+ DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb) {
+ this(maxConcurrentFactor, documentExpansionFactor, containerCoreMemoryMb, Statistics.nullImplementation,
+ new NullMetric());
+ }
+
+ DocprocThreadManager(double maxConcurrentFactor, double documentExpansionFactor, int containerCoreMemoryMb,
+ Statistics statistics, Metric metric) {
+ this((long) (((double) DocumentUtil.calculateMaxPendingSize(maxConcurrentFactor, documentExpansionFactor,
+ containerCoreMemoryMb)) * maxConcurrentFactor));
+ }
+
+ DocprocThreadManager(long maxConcurrentByteSize) {
+ final int MINCONCURRENTBYTES=256*1024*1024; //256M
+ if (maxConcurrentByteSize < MINCONCURRENTBYTES) {
+ maxConcurrentByteSize = MINCONCURRENTBYTES;
+ }
+
+ this.maxConcurrentByteSize = maxConcurrentByteSize;
+ log.log(LogLevel.CONFIG, "Docproc service allowed to concurrently process "
+ + (((double) maxConcurrentByteSize) / 1024.0d / 1024.0d) + " megabytes of input data.");
+ }
+
+ boolean isAboveLimit() {
+ return (bytesFinished.get() - bytesStarted.get() > maxConcurrentByteSize);
+ }
+ void beforeExecute(DocumentProcessingTask task) {
+ bytesStarted.getAndAdd(task.getApproxSize());
+ }
+
+ void afterExecute(DocumentProcessingTask task) {
+ bytesFinished.getAndAdd(task.getApproxSize());
+ }
+ void shutdown() {
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java
new file mode 100644
index 00000000000..dc690acfe0d
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutor.java
@@ -0,0 +1,58 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.log.LogLevel;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocprocThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private static Logger log = Logger.getLogger(DocprocThreadPoolExecutor.class.getName());
+ private DocprocThreadManager threadManager;
+
+ public DocprocThreadPoolExecutor(int maxNumThreads, BlockingQueue<Runnable> queue, DocprocThreadManager threadMgr) {
+ super((maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(),
+ (maxNumThreads > 0) ? maxNumThreads : Runtime.getRuntime().availableProcessors(),
+ 5, TimeUnit.MINUTES,
+ queue,
+ new DaemonThreadFactory("docproc-"));
+ this.threadManager = threadMgr;
+ allowCoreThreadTimeOut(false);
+ log.log(LogLevel.DEBUG, "Created docproc thread pool with " + super.getCorePoolSize() + " worker threads.");
+ }
+
+ @Override
+ protected void beforeExecute(Thread thread, Runnable runnable) {
+ threadManager.beforeExecute((DocumentProcessingTask) runnable);
+ }
+
+ @Override
+ protected void afterExecute(Runnable runnable, Throwable throwable) {
+ threadManager.afterExecute((DocumentProcessingTask) runnable);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ threadManager.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ List<Runnable> list = super.shutdownNow();
+ threadManager.shutdown();
+ return list;
+ }
+
+ boolean isAboveLimit() {
+ return threadManager.isAboveLimit();
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
new file mode 100644
index 00000000000..8fa428482e1
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandler.java
@@ -0,0 +1,235 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.google.inject.Inject;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.component.chain.model.ChainsModel;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.config.docproc.DocprocConfig;
+import com.yahoo.config.docproc.SchemamappingConfig;
+import com.yahoo.container.core.ChainsConfig;
+import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.container.jdisc.ContainerMbusConfig;
+import com.yahoo.docproc.AbstractConcreteDocumentFactory;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
+import com.yahoo.docproc.proxy.SchemaMap;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.documentapi.ThroughputLimitQueue;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.handler.AbstractRequestHandler;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.processing.execution.chain.ChainRegistry;
+import com.yahoo.statistics.Statistics;
+
+import java.util.TimerTask;
+import java.util.concurrent.*;
+import java.util.logging.Logger;
+
+import static com.yahoo.component.chain.ChainsConfigurer.prepareChainRegistry;
+import static com.yahoo.component.chain.model.ChainsModelBuilder.buildFromConfig;
+
+/**
+ * TODO: Javadoc
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingHandler extends AbstractRequestHandler {
+
+ private static Logger log = Logger.getLogger(DocumentProcessingHandler.class.getName());
+ private final ComponentRegistry<DocprocService> docprocServiceRegistry;
+ private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry;
+ private final ChainRegistry<DocumentProcessor> chainRegistry = new ChainRegistry<>();
+ private DocprocThreadPoolExecutor threadPool;
+ private final ScheduledThreadPoolExecutor laterExecutor =
+ new ScheduledThreadPoolExecutor(2, new DaemonThreadFactory("docproc-later-"));
+ private ContainerDocumentConfig containerDocConfig;
+ private final DocumentTypeManager documentTypeManager;
+
+ public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry,
+ ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
+ ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
+ DocprocThreadPoolExecutor threadPool, DocumentTypeManager documentTypeManager,
+ ChainsModel chainsModel, SchemaMap schemaMap, Statistics statistics,
+ Metric metric,
+ ContainerDocumentConfig containerDocConfig) {
+ this.docprocServiceRegistry = docprocServiceRegistry;
+ this.docFactoryRegistry = docFactoryRegistry;
+ this.threadPool = threadPool;
+ this.containerDocConfig = containerDocConfig;
+ this.documentTypeManager = documentTypeManager;
+ DocprocService.schemaMap = schemaMap;
+ threadPool.prestartCoreThread();
+ laterExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ laterExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+ if (chainsModel != null) {
+ prepareChainRegistry(chainRegistry, chainsModel, documentProcessorComponentRegistry);
+
+ for (Chain<DocumentProcessor> chain : chainRegistry.allComponents()) {
+ log.config("Setting up call stack for chain " + chain.getId());
+ DocprocService service =
+ new DocprocService(chain.getId(), convertToCallStack(chain, statistics, metric), documentTypeManager);
+ service.setInService(true);
+ docprocServiceRegistry.register(service.getId(), service);
+ }
+ }
+ }
+
+ public DocumentProcessingHandler(ComponentRegistry<DocprocService> docprocServiceRegistry,
+ ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
+ ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
+ DocumentProcessingHandlerParameters params) {
+ this(docprocServiceRegistry, documentProcessorComponentRegistry, docFactoryRegistry,
+ new DocprocThreadPoolExecutor(params.getMaxNumThreads(),
+ (params.getMaxQueueTimeMs() > 0)
+ ? new ThroughputLimitQueue<>(params.getMaxQueueTimeMs())
+ : (params.getMaxQueueTimeMs() < 0)
+ ? new LinkedBlockingQueue<>()
+ : new PriorityBlockingQueue<>(), //Probably no need to bound this queue, see bug #4254537
+ new DocprocThreadManager(params.getMaxConcurrentFactor(),
+ params.getDocumentExpansionFactor(),
+ params.getContainerCoreMemoryMb(),
+ params.getStatisticsManager(),
+ params.getMetric())),
+ params.getDocumentTypeManager(), params.getChainsModel(), params.getSchemaMap(),
+ params.getStatisticsManager(),
+ params.getMetric(),
+ params.getContainerDocConfig());
+ }
+
+ @Inject
+ public DocumentProcessingHandler(ComponentRegistry<DocumentProcessor> documentProcessorComponentRegistry,
+ ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
+ ChainsConfig chainsConfig,
+ SchemamappingConfig mappingConfig,
+ DocumentmanagerConfig docManConfig,
+ DocprocConfig docprocConfig,
+ ContainerMbusConfig containerMbusConfig,
+ ContainerDocumentConfig containerDocConfig,
+ Statistics manager,
+ Metric metric) {
+ this(new ComponentRegistry<>(),
+ documentProcessorComponentRegistry, docFactoryRegistry, new DocumentProcessingHandlerParameters().setMaxNumThreads
+ (docprocConfig.numthreads())
+ .setMaxConcurrentFactor(containerMbusConfig.maxConcurrentFactor())
+ .setDocumentExpansionFactor(containerMbusConfig.documentExpansionFactor())
+ .setContainerCoreMemoryMb(containerMbusConfig.containerCoreMemory())
+ .setMaxQueueTimeMs(docprocConfig.maxqueuetimems())
+ .setDocumentTypeManager(new DocumentTypeManager(docManConfig))
+ .setChainsModel(buildFromConfig(chainsConfig)).setSchemaMap(configureMapping(mappingConfig))
+ .setStatisticsManager(manager)
+ .setMetric(metric)
+ .setContainerDocumentConfig(containerDocConfig));
+ }
+
+ @Override
+ protected void destroy() {
+ threadPool.shutdown(); //calling shutdownNow() seems like a bit of an overkill
+ }
+
+ public ComponentRegistry<DocprocService> getDocprocServiceRegistry() {
+ return docprocServiceRegistry;
+ }
+
+ public ChainRegistry<DocumentProcessor> getChains() {
+ return chainRegistry;
+ }
+
+ private static SchemaMap configureMapping(SchemamappingConfig mappingConfig) {
+ SchemaMap map = new SchemaMap();
+ map.configure(mappingConfig);
+ return map;
+ }
+
+
+ private static CallStack convertToCallStack(Chain<DocumentProcessor> chain, Statistics statistics, Metric metric) {
+ CallStack stack = new CallStack(chain.getId().stringValue(), statistics, metric);
+ for (DocumentProcessor processor : chain.components()) {
+ processor.getFieldMap().putAll(DocprocService.schemaMap.chainMap(chain.getId().stringValue(), processor.getId().stringValue()));
+ stack.addLast(processor);
+ }
+ return stack;
+ }
+
+ @Override
+ public ContentChannel handleRequest(Request request, ResponseHandler handler) {
+ RequestContext requestContext;
+ if (request instanceof MbusRequest) {
+ requestContext = new MbusRequestContext((MbusRequest) request, handler, docprocServiceRegistry, docFactoryRegistry, containerDocConfig);
+ } else {
+ //Other types can be added here in the future
+ throw new IllegalArgumentException("Request type not supported: " + request);
+ }
+
+ if (!requestContext.isProcessable()) {
+ requestContext.skip();
+ return null;
+ }
+
+ DocprocService service = docprocServiceRegistry.getComponent(requestContext.getServiceName());
+ //No need to enqueue a task if the docproc chain is empty, just forward requestContext
+ if (service == null) {
+ log.log(LogLevel.ERROR, "DocprocService for session '" + requestContext.getServiceName() +
+ "' not found, returning request '" + requestContext + "'.");
+ requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE,
+ "DocprocService " + requestContext.getServiceName() + " not found.");
+ return null;
+ } else if (service.getExecutor().getCallStack().size() == 0) {
+ //call stack was empty, just forward message
+ requestContext.skip();
+ return null;
+ }
+
+ DocumentProcessingTask task = new DocumentProcessingTask(requestContext, this, service);
+ submit(task);
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ void submit(DocumentProcessingTask task) {
+ if (threadPool.isAboveLimit()) {
+ task.queueFull();
+ } else {
+ try {
+ threadPool.execute(task);
+ } catch (RejectedExecutionException ree) {
+ task.queueFull();
+ }
+ }
+ }
+
+ void submit(DocumentProcessingTask task, long delay) {
+ LaterTimerTask timerTask = new LaterTimerTask(task, delay);
+ laterExecutor.schedule(timerTask, delay, TimeUnit.MILLISECONDS);
+ }
+
+ private class LaterTimerTask extends TimerTask {
+ private DocumentProcessingTask processingTask;
+ private long delay;
+
+ private LaterTimerTask(DocumentProcessingTask processingTask, long delay) {
+ this.delay = delay;
+ log.log(LogLevel.DEBUG, "Enqueueing in " + delay + " ms due to Progress.LATER: " + processingTask);
+ this.processingTask = processingTask;
+ }
+
+ @Override
+ public void run() {
+ log.log(LogLevel.DEBUG, "Submitting after having waited " + delay + " ms in LATER queue: " + processingTask);
+ submit(processingTask);
+ }
+ }
+
+ public DocumentTypeManager getDocumentTypeManager() {
+ return documentTypeManager;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
new file mode 100644
index 00000000000..1a79e4e13c8
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerParameters.java
@@ -0,0 +1,169 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.component.chain.model.ChainsModel;
+import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.docproc.proxy.SchemaMap;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.jdisc.Metric;
+import com.yahoo.statistics.Statistics;
+
+/**
+ * Class to hold parameters given to DocumentProcessingHandler, typically used by unit tests.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @see com.yahoo.docproc.jdisc.DocumentProcessingHandler
+ */
+public class DocumentProcessingHandlerParameters {
+ private int maxNumThreads = 0;
+ private double maxConcurrentFactor = 0.2;
+ private double documentExpansionFactor = 20.0;
+ private int containerCoreMemoryMb = 50;
+ private long maxQueueTimeMs = 0;
+ private DocumentTypeManager documentTypeManager = null;
+ private ChainsModel chainsModel = null;
+ private SchemaMap schemaMap = null;
+ private Statistics statisticsManager = Statistics.nullImplementation;
+ private Metric metric = new NullMetric();
+ private ContainerDocumentConfig containerDocConfig;
+
+ public DocumentProcessingHandlerParameters() {
+ }
+
+ /**
+ * Returns the number of megabytes of memory reserved for container core classes and data.
+ *
+ * @return the number of megabytes of memory reserved for container core classes and data.
+ */
+ public int getContainerCoreMemoryMb() {
+ return containerCoreMemoryMb;
+ }
+
+ public DocumentProcessingHandlerParameters setContainerCoreMemoryMb(int containerCoreMemoryMb) {
+ this.containerCoreMemoryMb = containerCoreMemoryMb;
+ return this;
+ }
+
+ public Metric getMetric() {
+ return metric;
+ }
+
+ public DocumentProcessingHandlerParameters setMetric(Metric metric) {
+ this.metric = metric;
+ return this;
+ }
+
+ /**
+ * Returns the document expansion factor, i.e.&nbsp;by what factor a serialized and possibly compressed
+ * input document is expected to expand during deserialization, including any temporary memory needed
+ * when processing it.
+ *
+ * @return the document expansion factor.
+ */
+ public double getDocumentExpansionFactor() {
+ return documentExpansionFactor;
+ }
+
+ public DocumentProcessingHandlerParameters setDocumentExpansionFactor(double documentExpansionFactor) {
+ this.documentExpansionFactor = documentExpansionFactor;
+ return this;
+ }
+
+ /**
+ * Returns the max concurrent factor.
+ *
+ * @return the max concurrent factor.
+ */
+ public double getMaxConcurrentFactor() {
+ return maxConcurrentFactor;
+ }
+
+ public DocumentProcessingHandlerParameters setMaxConcurrentFactor(double maxConcurrentFactor) {
+ this.maxConcurrentFactor = maxConcurrentFactor;
+ return this;
+ }
+
+ /**
+ * Returns the maximum time (in milliseconds) that a document may stay in the input queue.&nbsp;The default value
+ * of 0 disables this functionality.
+ *
+ * @return the maximum time (in milliseconds) that a document may stay in the input queue.
+ */
+ public long getMaxQueueTimeMs() {
+ return maxQueueTimeMs;
+ }
+
+ public DocumentProcessingHandlerParameters setMaxQueueTimeMs(long maxQueueTimeMs) {
+ this.maxQueueTimeMs = maxQueueTimeMs;
+ return this;
+ }
+
+ /**
+ * Returns the maximum number of thread that the thread pool will ever attempt to run simultaneously.
+ *
+ * @return the maximum number of thread that the thread pool will ever attempt to run simultaneously.
+ */
+ public int getMaxNumThreads() {
+ return maxNumThreads;
+ }
+
+ public DocumentProcessingHandlerParameters setMaxNumThreads(int maxNumThreads) {
+ this.maxNumThreads = maxNumThreads;
+ return this;
+ }
+
+ public DocumentTypeManager getDocumentTypeManager() {
+ return documentTypeManager;
+ }
+
+ public DocumentProcessingHandlerParameters setDocumentTypeManager(DocumentTypeManager documentTypeManager) {
+ this.documentTypeManager = documentTypeManager;
+ return this;
+ }
+
+ /**
+ * Returns the chains model, used to build call stacks.
+ * @return the chains model, used to build call stacks.
+ */
+ public ChainsModel getChainsModel() {
+ return chainsModel;
+ }
+
+ public DocumentProcessingHandlerParameters setChainsModel(ChainsModel chainsModel) {
+ this.chainsModel = chainsModel;
+ return this;
+ }
+
+ /**
+ * Returns the schema map to be used by the docproc handler.
+ *
+ * @return the schema map to be used by the docproc handler.
+ */
+ public SchemaMap getSchemaMap() {
+ return schemaMap;
+ }
+
+ public DocumentProcessingHandlerParameters setSchemaMap(SchemaMap schemaMap) {
+ this.schemaMap = schemaMap;
+ return this;
+ }
+
+ public Statistics getStatisticsManager() {
+ return statisticsManager;
+ }
+
+ public DocumentProcessingHandlerParameters setStatisticsManager(Statistics statisticsManager) {
+ this.statisticsManager = statisticsManager;
+ return this;
+ }
+
+ public DocumentProcessingHandlerParameters setContainerDocumentConfig(ContainerDocumentConfig containerDocConfig) {
+ this.containerDocConfig = containerDocConfig;
+ return this;
+ }
+
+ public ContainerDocumentConfig getContainerDocConfig() {
+ return containerDocConfig;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java
new file mode 100644
index 00000000000..7e92e8eb5a1
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/DocumentProcessingTask.java
@@ -0,0 +1,235 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Tuple2;
+import com.yahoo.docproc.Call;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocprocExecutor;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.HandledProcessingException;
+import com.yahoo.docproc.Processing;
+import com.yahoo.log.LogLevel;
+import com.yahoo.yolean.Exceptions;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingTask implements Comparable<DocumentProcessingTask>, Runnable {
+ private static Logger log = Logger.getLogger(DocumentProcessingTask.class.getName());
+ private final List<Processing> processings = new ArrayList<>();
+ private final List<Processing> processingsDone = new ArrayList<>();
+
+ private final DocumentProcessingHandler docprocHandler;
+ private RequestContext requestContext;
+ private int waitCounter;
+
+ private final static AtomicLong seq = new AtomicLong();
+ private final long seqNum;
+ private final DocprocService service;
+
+ public DocumentProcessingTask(RequestContext requestContext, DocumentProcessingHandler docprocHandler,
+ DocprocService service) {
+ seqNum = seq.getAndIncrement();
+ this.requestContext = requestContext;
+ this.docprocHandler = docprocHandler;
+ this.waitCounter = 10;
+ this.service = service;
+ }
+
+ @Override
+ public void run() {
+ try {
+ try {
+ processings.addAll(requestContext.getProcessings());
+ } catch (Exception e) {
+ //deserialization failed:
+ log.log(LogLevel.WARNING, "Deserialization of message failed.", e);
+ requestContext.processingFailed(e);
+ return;
+ }
+
+ DocprocExecutor executor = service.getExecutor();
+ DocumentProcessor.Progress progress = process(executor);
+
+ if (DocumentProcessor.Progress.LATER.equals(progress) && !processings.isEmpty()) {
+ DocumentProcessor.LaterProgress laterProgress = (DocumentProcessor.LaterProgress) progress;
+ docprocHandler.submit(this, laterProgress.getDelay());
+ }
+ } catch (Error error) {
+ try {
+ log.log(LogLevel.FATAL, Exceptions.toMessageString(error), error);
+ } catch (Throwable t) {
+ // do nothing
+ } finally {
+ Runtime.getRuntime().halt(1);
+ }
+ }
+ }
+
+ /**
+ * Used by DocprocThreadManager. If a ProcessingTask has been taken by a thread, it can wait() no longer than
+ * waitCounter (currently 10) times before being executed. This is to prevent large tasks from being delayed
+ * forever.
+ *
+ * @return true if this task can wait, false if it must run NOW.
+ */
+ boolean doWait() {
+ --waitCounter;
+ return (waitCounter > 0);
+ }
+
+ /**
+ * Processes a single Processing, and fails the message if this processing fails.
+ *
+ * @param executor the DocprocService to use for processing
+ */
+ private DocumentProcessor.Progress process(DocprocExecutor executor) {
+ Iterator<Processing> iterator = processings.iterator();
+ List<Tuple2<DocumentProcessor.Progress, Processing>> later = new ArrayList<>();
+ while (iterator.hasNext()) {
+ Processing processing = iterator.next();
+ iterator.remove();
+ if (requestContext.hasExpired()) {
+ DocumentProcessor.Progress progress = DocumentProcessor.Progress.FAILED;
+ final String location;
+ if (processing != null) {
+ final CallStack callStack = processing.callStack();
+ if (callStack != null) {
+ final Call lastPopped = callStack.getLastPopped();
+ if (lastPopped != null) {
+ location = lastPopped.toString();
+ } else {
+ location = "empty call stack or no processors popped";
+ }
+ } else {
+ location = "no call stack";
+ }
+ } else {
+ location = "no processing instance";
+ }
+ String errorMsg = processing + " failed, " + location;
+ log.log(Level.FINE, "Time is up for '" + errorMsg + "'.");
+ requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE, "Time is up.");
+ return progress;
+ }
+
+ DocumentProcessor.Progress progress = DocumentProcessor.Progress.FAILED;
+ try {
+ progress = executor.process(processing);
+ } catch (Exception e) {
+ logProcessingFailure(processing, e);
+ requestContext.processingFailed(e);
+ return progress;
+ }
+
+ if (DocumentProcessor.Progress.LATER.equals(progress)) {
+ later.add(new Tuple2<>(progress, processing));
+ } else if (DocumentProcessor.Progress.DONE.equals(progress)) {
+ processingsDone.add(processing);
+ } else if (DocumentProcessor.Progress.FAILED.equals(progress)) {
+ logProcessingFailure(processing, null);
+ requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE,
+ "Document processing failed.");
+ return progress;
+ } else if (DocumentProcessor.Progress.PERMANENT_FAILURE.equals(progress)) {
+ logProcessingFailure(processing, null);
+ requestContext.processingFailed(RequestContext.ErrorCode.ERROR_PROCESSING_FAILURE,
+ "Document processing failed.");
+ return progress;
+ }
+ }
+
+ // Processings that have FAILED will have made this method terminate by now.
+ // We now have successful Processings in 'processingsDone' and
+ // the ones that have returned LATER in 'later'.
+
+ if (!later.isEmpty()) {
+ // Outdated comment:
+ // "if this was a multioperationmessage and more than one of the processings returned LATER,
+ // return the one with the lowest timeout:"
+ // As multioperation is removed this can probably be simplified?
+ DocumentProcessor.LaterProgress shortestDelay = (DocumentProcessor.LaterProgress) later.get(0).first;
+ for (Tuple2<DocumentProcessor.Progress, Processing> tuple : later) {
+ // re-add the LATER one to processings
+ processings.add(tuple.second);
+ // check to see if this one had a lower timeout than the previous one:
+ if (((DocumentProcessor.LaterProgress) tuple.first).getDelay() < shortestDelay.getDelay()) {
+ shortestDelay = (DocumentProcessor.LaterProgress) tuple.first;
+ }
+ }
+ return shortestDelay;
+ } else {
+ requestContext.processingDone(processingsDone);
+ return DocumentProcessor.Progress.DONE;
+ }
+ }
+
+
+ void queueFull() {
+ requestContext.processingFailed(RequestContext.ErrorCode.ERROR_BUSY,
+ "Queue temporarily full. Returning message " + requestContext +
+ ". Will be automatically resent.");
+ }
+
+ public int compareTo(DocumentProcessingTask other) {
+ int ourPriority = requestContext.getPriority();
+ int otherPriority = other.requestContext.getPriority();
+ int res = (ourPriority == otherPriority) ? 0 : ((ourPriority < otherPriority) ? -1 : 1);
+ if (res == 0) {
+ res = (seqNum == other.seqNum) ? 0 : ((seqNum < other.seqNum) ? -1 : 1);
+ }
+ return res;
+ }
+
+ @Override
+ public String toString() {
+ return "ProcessingTask{" +
+ "processings=" + processings +
+ ", processingsDone=" + processingsDone +
+ ", requestContext=" + requestContext +
+ ", seqNum=" + seqNum +
+ '}';
+ }
+
+ public int getApproxSize() {
+ return requestContext.getApproxSize();
+ }
+
+ final long getSeqNum() {
+ return seqNum;
+ }
+
+ private static void logProcessingFailure(Processing processing, Exception exception) {
+ //LOGGING ONLY:
+ String errorMsg = processing + " failed at " + processing.callStack().getLastPopped();
+ if (exception != null) {
+ if (exception instanceof HandledProcessingException) {
+ errorMsg += ". Error message: " + exception.getMessage();
+ log.log(Level.WARNING, errorMsg);
+ log.log(Level.FINE, "Chained exception:", exception);
+ } else {
+ log.log(Level.WARNING, errorMsg, exception);
+ }
+ } else {
+ log.log(Level.WARNING, errorMsg);
+ }
+ //LOGGING OF STACK TRACE:
+ if (exception != null) {
+ StringWriter backtrace = new StringWriter();
+ exception.printStackTrace(new PrintWriter(backtrace));
+ log.log(LogLevel.DEBUG, "Failed to process " + processing + ": " + backtrace.toString());
+ }
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java
new file mode 100644
index 00000000000..eeda93f12a1
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/RequestContext.java
@@ -0,0 +1,66 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.docproc.Processing;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Response;
+
+import java.net.URI;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public interface RequestContext {
+
+ public List<Processing> getProcessings();
+
+ public String getServiceName();
+
+ public URI getUri();
+
+ public boolean isProcessable();
+
+ public int getApproxSize();
+
+ public int getPriority();
+
+ public void processingDone(List<Processing> processing);
+
+ public void processingFailed(ErrorCode error, String msg);
+
+ public void processingFailed(Exception exception);
+
+ /**
+ * Will check if the given timeout has expired
+ * @return true if the timeout has expired.
+ */
+ public default boolean hasExpired() { return false;}
+
+ public void skip();
+
+ public enum ErrorCode {
+ //transient:
+ ERROR_ABORTED(Response.Status.TEMPORARY_REDIRECT, DocumentProtocol.ERROR_ABORTED),
+ ERROR_BUSY(Response.Status.TEMPORARY_REDIRECT, DocumentProtocol.ERROR_BUSY),
+ //fatal:
+ ERROR_PROCESSING_FAILURE(Response.Status.INTERNAL_SERVER_ERROR, DocumentProtocol.ERROR_PROCESSING_FAILURE);
+
+
+ private int discStatus;
+ private int documentProtocolStatus;
+
+ private ErrorCode(int discStatus, int documentProtocolStatus) {
+ this.discStatus = discStatus;
+ this.documentProtocolStatus = documentProtocolStatus;
+ }
+
+ public int getDiscStatus() {
+ return discStatus;
+ }
+
+ public int getDocumentProtocolStatus() {
+ return documentProtocolStatus;
+ }
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
new file mode 100644
index 00000000000..98a091e5dfd
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MbusRequestContext.java
@@ -0,0 +1,235 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc.messagebus;
+
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.docproc.AbstractConcreteDocumentFactory;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.HandledProcessingException;
+import com.yahoo.docproc.Processing;
+import com.yahoo.docproc.TransientFailureException;
+import com.yahoo.docproc.jdisc.RequestContext;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Request;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.RequestDispatch;
+import com.yahoo.jdisc.handler.ResponseDispatch;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.jdisc.MbusRequest;
+import com.yahoo.messagebus.jdisc.MbusResponse;
+import com.yahoo.messagebus.routing.Route;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class MbusRequestContext implements RequestContext, ResponseHandler {
+
+ private final static Logger log = Logger.getLogger(MbusRequestContext.class.getName());
+ private final static CopyOnWriteHashMap<String, URI> uriCache = new CopyOnWriteHashMap<>();
+ private final AtomicBoolean deserialized = new AtomicBoolean(false);
+ private final AtomicBoolean responded = new AtomicBoolean(false);
+ private final ProcessingFactory processingFactory;
+ private final MessageFactory messageFactory;
+ private final MbusRequest request;
+ private final DocumentMessage requestMsg;
+ private final ResponseHandler responseHandler;
+ private volatile int cachedApproxSize;
+ // When spawning off new documents inside document processor, we do not want
+ // throttling since this can lead to live locks. This is because the
+ // document being processed is a resource and is then grabbing more resources of
+ // the same type without releasing its own resources.
+ public final static String internalNoThrottledSource = "internalNoThrottledSource";
+
+ public MbusRequestContext(MbusRequest request, ResponseHandler responseHandler,
+ ComponentRegistry<DocprocService> docprocServiceComponentRegistry,
+ ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
+ ContainerDocumentConfig containerDocConfig) {
+ this.request = request;
+ this.requestMsg = (DocumentMessage)request.getMessage();
+ this.responseHandler = responseHandler;
+ this.processingFactory = new ProcessingFactory(docprocServiceComponentRegistry, docFactoryRegistry,
+ containerDocConfig, getServiceName());
+ this.messageFactory = newMessageFactory(requestMsg);
+ }
+
+ @Override
+ public List<Processing> getProcessings() {
+ if (deserialized.getAndSet(true)) {
+ return Collections.emptyList();
+ }
+ return processingFactory.fromMessage(requestMsg);
+ }
+
+ @Override
+ public void skip() {
+ if (deserialized.get()) {
+ throw new IllegalStateException("Can not skip processing after deserialization.");
+ }
+ dispatchRequest(requestMsg, request.getUri().getPath(), responseHandler);
+ }
+
+ @Override
+ public void processingDone(List<Processing> processings) {
+ List<DocumentMessage> messages = new ArrayList<>();
+ if (messageFactory != null) {
+ for (Processing processing : processings) {
+ for (DocumentOperation documentOperation : processing.getDocumentOperations()) {
+ messages.add(messageFactory.fromDocumentOperation(processing, documentOperation));
+ }
+ }
+ }
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Forwarding " + messages.size() + " messages from " + processings.size() +
+ " processings.");
+ }
+ if (messages.isEmpty()) {
+ dispatchResponse(Response.Status.OK);
+ return;
+ }
+ long inputSequenceId = requestMsg.getSequenceId();
+ ResponseMerger responseHandler = new ResponseMerger(requestMsg, messages.size(), this);
+ for (Message message : messages) {
+ // See comment for internalNoThrottledSource.
+ dispatchRequest(message, (inputSequenceId == message.getSequenceId())
+ ? getUri().getPath()
+ : "/" + internalNoThrottledSource,
+ responseHandler);
+ }
+ }
+
+ @Override
+ public void processingFailed(Exception exception) {
+ ErrorCode errorCode;
+ if (exception instanceof TransientFailureException) {
+ errorCode = ErrorCode.ERROR_ABORTED;
+ } else {
+ errorCode = ErrorCode.ERROR_PROCESSING_FAILURE;
+ }
+ StringBuilder errorMsg = new StringBuilder("Processing failed.");
+ if (exception instanceof HandledProcessingException) {
+ errorMsg.append(" Error message: ").append(exception.getMessage());
+ } else if (exception != null) {
+ errorMsg.append(" Error message: ").append(exception.toString());
+ }
+ errorMsg.append(" -- See Vespa log for details.");
+ processingFailed(errorCode, errorMsg.toString());
+ }
+
+ @Override
+ public void processingFailed(ErrorCode errorCode, String errorMsg) {
+ MbusResponse response = new MbusResponse(errorCode.getDiscStatus(), requestMsg.createReply());
+ response.getReply().addError(new com.yahoo.messagebus.Error(errorCode.getDocumentProtocolStatus(), errorMsg));
+ ResponseDispatch.newInstance(response).dispatch(this);
+ }
+
+ @Override
+ public int getApproxSize() {
+ if (cachedApproxSize > 0) {
+ return cachedApproxSize;
+ }
+ cachedApproxSize = requestMsg.getApproxSize();
+ return cachedApproxSize;
+ }
+
+ @Override
+ public int getPriority() {
+ return requestMsg.getPriority().getValue();
+ }
+
+ @Override
+ public URI getUri() {
+ return request.getUri();
+ }
+
+ @Override
+ public String getServiceName() {
+ String path = getUri().getPath();
+ return path.substring(7, path.length());
+ }
+
+ @Override
+ public boolean isProcessable() {
+ Message msg = requestMsg;
+ switch (msg.getType()) {
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT:
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
+ case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE:
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean hasExpired() {
+ return requestMsg.isExpired();
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ if (responded.getAndSet(true)) {
+ return null;
+ }
+ Reply reply = ((MbusResponse)response).getReply();
+ reply.swapState(requestMsg);
+ return responseHandler.handleResponse(response);
+ }
+
+ private void dispatchResponse(int status) {
+ ResponseDispatch.newInstance(new MbusResponse(status, requestMsg.createReply())).dispatch(this);
+ }
+
+ private void dispatchRequest(final Message msg, final String uriPath, final ResponseHandler handler) {
+ try {
+ new RequestDispatch() {
+
+ @Override
+ protected Request newRequest() {
+ return new MbusRequest(request, resolveUri(uriPath), msg);
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ return handler.handleResponse(response);
+ }
+ }.dispatch();
+ } catch (Exception e) {
+ dispatchResponse(Response.Status.INTERNAL_SERVER_ERROR);
+ e.printStackTrace();
+ }
+ }
+
+ private static MessageFactory newMessageFactory(final DocumentMessage msg) {
+ if (msg == null) {
+ return null;
+ }
+ final Route route = msg.getRoute();
+ if (route == null || !route.hasHops()) {
+ return null;
+ }
+ return new MessageFactory(msg);
+ }
+
+ private static URI resolveUri(String path) {
+ URI uri = uriCache.get(path);
+ if (uri == null) {
+ uri = URI.create("mbus://remotehost" + path);
+ uriCache.put(path, uri);
+ }
+ return uri;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java
new file mode 100644
index 00000000000..d3962a20ea8
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/MessageFactory.java
@@ -0,0 +1,67 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc.messagebus;
+
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.*;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
+import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.log.LogLevel;
+import com.yahoo.messagebus.Message;
+import com.yahoo.vdslib.DocumentList;
+import com.yahoo.vdslib.Entry;
+
+import java.util.logging.Logger;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+class MessageFactory {
+
+ private final static Logger log = Logger.getLogger(MessageFactory.class.getName());
+ private final Message requestMsg;
+ private final LoadType loadType;
+ private final DocumentProtocol.Priority priority;
+
+ public MessageFactory(DocumentMessage requestMsg) {
+ this.requestMsg = requestMsg;
+ loadType = requestMsg.getLoadType();
+ priority = requestMsg.getPriority();
+ }
+
+ public DocumentMessage fromDocumentOperation(Processing processing, DocumentOperation documentOperation) {
+ DocumentMessage msg = newMessage(documentOperation);
+ msg.setLoadType(loadType);
+ msg.setPriority(priority);
+ msg.setRoute(requestMsg.getRoute());
+ msg.setTimeReceivedNow();
+ msg.setTimeRemaining(requestMsg.getTimeRemainingNow());
+ msg.getTrace().setLevel(requestMsg.getTrace().getLevel());
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Created '" + msg.getClass().getName() +
+ "', route = '" + msg.getRoute() +
+ "', priority = '" + msg.getPriority().name() +
+ "', load type = '" + msg.getLoadType() +
+ "', trace level = '" + msg.getTrace().getLevel() +
+ "', time remaining = '" + msg.getTimeRemaining() + "'.");
+ }
+ return msg;
+ }
+
+ private static DocumentMessage newMessage(DocumentOperation documentOperation) {
+ final TestAndSetMessage message;
+
+ if (documentOperation instanceof DocumentPut) {
+ message = new PutDocumentMessage(((DocumentPut)documentOperation));
+ } else if (documentOperation instanceof DocumentUpdate) {
+ message = new UpdateDocumentMessage((DocumentUpdate)documentOperation);
+ } else if (documentOperation instanceof DocumentRemove) {
+ message = new RemoveDocumentMessage(documentOperation.getId());
+ } else {
+ throw new UnsupportedOperationException(documentOperation.getClass().getName());
+ }
+
+ message.setCondition(documentOperation.getCondition());
+ return message;
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java
new file mode 100644
index 00000000000..19c9d2d86d1
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ProcessingFactory.java
@@ -0,0 +1,118 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc.messagebus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.logging.Logger;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.docproc.AbstractConcreteDocumentFactory;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.messagebus.Message;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+class ProcessingFactory {
+
+ private final static Logger log = Logger.getLogger(ProcessingFactory.class.getName());
+ private final ComponentRegistry<DocprocService> docprocServiceComponentRegistry;
+ private final ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry;
+ private final ContainerDocumentConfig containerDocConfig;
+ private final String serviceName;
+
+ public ProcessingFactory(ComponentRegistry<DocprocService> docprocServiceComponentRegistry,
+ ComponentRegistry<AbstractConcreteDocumentFactory> docFactoryRegistry,
+ ContainerDocumentConfig containerDocConfig,
+ String serviceName) {
+ this.docprocServiceComponentRegistry = docprocServiceComponentRegistry;
+ this.docFactoryRegistry = docFactoryRegistry;
+ this.containerDocConfig = containerDocConfig;
+ this.serviceName = serviceName;
+ }
+
+ public List<Processing> fromMessage(Message message) {
+ List<Processing> processings = new ArrayList<>();
+ switch (message.getType()) {
+ case DocumentProtocol.MESSAGE_PUTDOCUMENT: {
+ PutDocumentMessage putMessage = (PutDocumentMessage) message;
+ DocumentPut putOperation = new DocumentPut(createPutDocument(putMessage));
+ putOperation.setCondition(putMessage.getCondition());
+ processings.add(createProcessing(putOperation, message));
+ break;
+ }
+ case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: {
+ UpdateDocumentMessage updateMessage = (UpdateDocumentMessage) message;
+ DocumentUpdate updateOperation = updateMessage.getDocumentUpdate();
+ updateOperation.setCondition(updateMessage.getCondition());
+ processings.add(createProcessing(updateOperation, message));
+ break;
+ }
+ case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: {
+ RemoveDocumentMessage removeMessage = (RemoveDocumentMessage) message;
+ DocumentRemove removeOperation = new DocumentRemove(removeMessage.getDocumentId());
+ removeOperation.setCondition(removeMessage.getCondition());
+ processings.add(createProcessing(removeOperation, message));
+ break;
+ }
+ case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE: {
+ for (DocumentUpdate update : ((BatchDocumentUpdateMessage) message).getUpdates()) {
+ processings.add(createProcessing(update, message));
+ }
+ break;
+ }
+ }
+ return processings;
+ }
+
+ private Document createPutDocument(PutDocumentMessage msg) {
+ Document document = msg.getDocumentPut().getDocument();
+ String typeName = document.getDataType().getName();
+ ContainerDocumentConfig.Doctype typeConfig = getDocumentConfig(typeName);
+ if (typeConfig == null) {
+ return document;
+ }
+ return createConcreteDocument(document, typeConfig);
+ }
+
+ private Document createConcreteDocument(Document document, ContainerDocumentConfig.Doctype typeConfig) {
+ String componentId = typeConfig.factorycomponent(); // Class name of the factory
+ AbstractConcreteDocumentFactory cdf = docFactoryRegistry.getComponent(new ComponentId(componentId));
+ if (cdf == null) {
+ log.fine("Unable to get document factory component '" + componentId + "' from document factory registry.");
+ return document;
+ }
+ return cdf.getDocumentCopy(document.getDataType().getName(), document, document.getId());
+ }
+
+ private ContainerDocumentConfig.Doctype getDocumentConfig(String name) {
+ for (ContainerDocumentConfig.Doctype type : containerDocConfig.doctype()) {
+ if (name.equals(type.type())) {
+ return type;
+ }
+ }
+ return null;
+ }
+
+ private Processing createProcessing(DocumentOperation documentOperation, Message message) {
+ Processing processing = new Processing();
+ processing.addDocumentOperation(documentOperation);
+ processing.setServiceName(serviceName);
+ processing.setDocprocServiceRegistry(docprocServiceComponentRegistry);
+ processing.setVariable("route", message.getRoute());
+ processing.setVariable("timeout", message.getTimeRemaining());
+ return processing;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java
new file mode 100644
index 00000000000..f29618a0e44
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/ResponseMerger.java
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc.messagebus;
+
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.Response;
+import com.yahoo.jdisc.handler.ContentChannel;
+import com.yahoo.jdisc.handler.ResponseDispatch;
+import com.yahoo.jdisc.handler.ResponseHandler;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.TraceNode;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.MbusResponse;
+import com.yahoo.messagebus.jdisc.StatusCodes;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+class ResponseMerger implements ResponseHandler {
+
+ private final Message requestMsg;
+ private final TraceNode requestTrace = new TraceNode().setStrict(false);
+ private final ResponseHandler responseHandler;
+ private final List<Reply> replies;
+ private int numPending;
+
+ public ResponseMerger(Message requestMsg, int numPending, ResponseHandler responseHandler) {
+ this.requestMsg = requestMsg;
+ this.responseHandler = responseHandler;
+ this.replies = new ArrayList<>(numPending);
+ this.numPending = numPending;
+ }
+
+ @Override
+ public ContentChannel handleResponse(Response response) {
+ synchronized (this) {
+ if (response instanceof MbusResponse) {
+ Reply reply = ((MbusResponse)response).getReply();
+ requestTrace.addChild(reply.getTrace().getRoot());
+ replies.add(reply);
+ }
+ if (--numPending != 0) {
+ return null;
+ }
+ }
+ requestMsg.getTrace().getRoot().addChild(requestTrace);
+ Reply reply = DocumentProtocol.merge(replies);
+ Response mbusResponse = new MbusResponse(StatusCodes.fromMbusReply(reply), reply);
+ ResponseDispatch.newInstance(mbusResponse).dispatch(responseHandler);
+ return null;
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java
new file mode 100644
index 00000000000..2d704fc70da
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/messagebus/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.docproc.jdisc.messagebus;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java
new file mode 100644
index 00000000000..e7c043d352d
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/metric/NullMetric.java
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc.metric;
+
+import com.yahoo.jdisc.Metric;
+
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class NullMetric implements Metric {
+ @Override
+ public void set(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public void add(String key, Number val, Context ctx) {
+ }
+
+ @Override
+ public Context createContext(Map<String, ?> properties) {
+ return NullContext.INSTANCE;
+ }
+
+ private static class NullContext implements Context {
+ private static final NullContext INSTANCE = new NullContext();
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java b/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java
new file mode 100644
index 00000000000..610aa2462a4
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/jdisc/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/docproc/src/main/java/com/yahoo/docproc/package-info.java b/docproc/src/main/java/com/yahoo/docproc/package-info.java
new file mode 100644
index 00000000000..b95e4bb128b
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/package-info.java
@@ -0,0 +1,7 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi
+package com.yahoo.docproc;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java
new file mode 100644
index 00000000000..b3b29a41cc7
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocument.java
@@ -0,0 +1,361 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.proxy;
+
+import com.yahoo.docproc.Accesses;
+import com.yahoo.docproc.DocumentOperationWrapper;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.Field;
+import com.yahoo.document.FieldPath;
+import com.yahoo.document.datatypes.FieldPathIteratorHandler;
+import com.yahoo.document.datatypes.FieldPathIteratorHandler.ModificationStatus;
+import com.yahoo.document.datatypes.FieldValue;
+import com.yahoo.document.datatypes.Struct;
+import com.yahoo.document.serialization.DocumentReader;
+import com.yahoo.document.serialization.DocumentWriter;
+import com.yahoo.document.serialization.FieldReader;
+import com.yahoo.document.serialization.FieldWriter;
+import com.yahoo.document.serialization.SerializationException;
+import com.yahoo.document.serialization.XmlStream;
+import com.yahoo.vespa.objects.Serializer;
+
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * This is a facade to a Document, with multiple purposes: <ul> <li>Getters and setters for field data takes possibly
+ * into account a schema map of field names. <li>We support mapping into struct fields of arbitrary depth using
+ * from→mystruct.mystruct.myfield </ul> We also enforce the @Accesses annotation(s) of the doc proc which uses this.
+ *
+ * @author <a href="mailto:vegardh@yahoo-inc.com">Vegard Havdal</a>
+ */
+public class ProxyDocument extends Document implements DocumentOperationWrapper {
+
+ private static final long serialVersionUID = 1L;
+ private final Map<String, String> fieldMap;
+ private final Set<String> fieldsAllowed = new HashSet<>();
+ private final String docProcName;
+ private Document doc;
+
+ public ProxyDocument(DocumentProcessor docProc, Document doc, Map<String, String> fieldMap) {
+ super(doc);
+ if (docProc.getClass().getAnnotation(Accesses.class)!=null) {
+ for (com.yahoo.docproc.Accesses.Field field : docProc.getClass().getAnnotation(Accesses.class).value()) {
+ String name = field.name();
+ if (fieldMap!=null && fieldMap.get(name) !=null) name = fieldMap.get(name);
+ fieldsAllowed.add(name);
+ }
+ }
+ this.fieldMap = fieldMap;
+ this.docProcName = docProc.toString();
+ this.doc=doc;
+ }
+
+ private void checkAccess(Field field) {
+ if (!fieldsAllowed.isEmpty() && !fieldsAllowed.contains(field.getName())) {
+ throw new IllegalArgumentException("Processor '" + docProcName + "' is not allowed to access field '" +
+ field.getName() + "'.");
+ }
+ }
+
+ /**
+ * note that the returned Field may not be in this Document
+ * directly, but may refer to a field in a struct contained in it,
+ * in which case the returned Field is only useful for obtaining
+ * the field type; it can't be used for get() and set().
+ **/
+ @Override
+ public Field getField(String fieldName) {
+ if (fieldMap != null && fieldMap.containsKey(fieldName)) {
+ fieldName = fieldMap.get(fieldName);
+ }
+ FieldPath path = getFieldPath(fieldName);
+ Field ret = path.get(path.size() - 1).getFieldRef();
+ checkAccess(ret);
+ return ret;
+ }
+
+ @Override
+ public FieldValue getFieldValue(String fieldName) {
+ return getRecursiveValue(getFieldPath(fieldName));
+ }
+
+ @Override
+ public FieldValue getFieldValue(Field field) {
+ //checkAccess(field);
+ return doc.getFieldValue(field);
+ }
+
+ @Override
+ public FieldValue setFieldValue(String fieldName, FieldValue fieldValue) {
+ SetHandler handler = new SetHandler(fieldValue);
+ FieldPath path = getFieldPath(fieldName);
+ iterateNested(path, 0, handler);
+ if (!handler.doModifyCalled) {
+ //the value in question was not found
+ throw new IllegalArgumentException("Field '" + fieldName + "' mapped by '" + path + "' was not found.");
+ }
+ return handler.prevVal;
+ }
+
+ @Override
+ public FieldValue setFieldValue(Field field, FieldValue fieldValue) {
+ checkAccess(field);
+ return doc.setFieldValue(field, fieldValue);
+ }
+
+ @Override
+ public FieldValue removeFieldValue(String fieldName) {
+ RemoveHandler handler = new RemoveHandler();
+ FieldPath path = getFieldPath(fieldName);
+ iterateNested(path, 0, handler);
+ if (!handler.doModifyCalled) {
+ //the value in question was not found
+ throw new IllegalArgumentException("Field '" + fieldName + "' mapped by '" + path + "' was not found.");
+ }
+ return handler.prevVal;
+ }
+
+ @Override
+ public FieldValue removeFieldValue(Field field) {
+ checkAccess(field);
+ return doc.removeFieldValue(field);
+ }
+
+ private FieldPath getFieldPath(String fieldName) {
+ if (fieldMap != null && fieldMap.containsKey(fieldName)) {
+ fieldName = fieldMap.get(fieldName);
+ }
+ checkAccess(new Field(fieldName));
+ FieldPath path = FieldPath.newInstance(getDataType(), fieldName);
+ if (path == null || path.size() == 0) {
+ throw new IllegalArgumentException("Malformed schema mapping '" + fieldName + "'.");
+ }
+ return path;
+ }
+
+ public DocumentOperation getWrappedDocumentOperation() {
+ return new DocumentPut(this);
+ }
+
+ private static class SetHandler extends FieldPathIteratorHandler {
+
+ private final FieldValue nextVal;
+ private FieldValue prevVal;
+ private boolean doModifyCalled = false;
+
+ public SetHandler(FieldValue fieldValue) {
+ nextVal = fieldValue;
+ }
+
+ @Override
+ public boolean onComplex(FieldValue fieldVal) {
+ return false;
+ }
+
+ @Override
+ public boolean createMissingPath() {
+ return true;
+ }
+
+
+ @Override
+ public ModificationStatus doModify(FieldValue fieldVal) {
+ doModifyCalled = true;
+ prevVal = fieldVal.clone();
+ fieldVal.assign(nextVal);
+ return ModificationStatus.MODIFIED;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return doc.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return doc.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return doc.hashCode();
+ }
+
+ @Override
+ public Document clone() {
+ return doc.clone();
+ }
+
+ @Override
+ public void clear() {
+ doc.clear();
+ }
+
+ @Override
+ public Iterator<Entry<Field, FieldValue>> iterator() {
+ return doc.iterator();
+ }
+
+ @Override
+ public DocumentId getId() {
+ return doc.getId();
+ }
+
+ @Override
+ public void setLastModified(Long lastModified) {
+ doc.setLastModified(lastModified);
+ }
+
+ @Override
+ public Long getLastModified() {
+ return doc.getLastModified();
+ }
+
+ @Override
+ public void setId(DocumentId id) {
+ doc.setId(id);
+ }
+
+ @Override
+ public Struct getHeader() {
+ return doc.getHeader();
+ }
+
+ @Override
+ public Struct getBody() {
+ return doc.getBody();
+ }
+
+ @Override
+ public void assign(Object o) {
+ doc.assign(o);
+ }
+
+ @Override
+ public void setDataType(DataType type) {
+ doc.setDataType(type);
+ }
+
+ @Override
+ public int getSerializedSize() throws SerializationException {
+ return doc.getSerializedSize();
+ }
+
+ @Override
+ public void serialize(OutputStream out) throws SerializationException {
+ doc.serialize(out);
+ }
+
+ @Override
+ protected void doSetFieldValue(Field field, FieldValue value) {
+ super.doSetFieldValue(field, value);
+ }
+
+ @Override
+ public String toXML(String indent) {
+ return doc.toXML(indent);
+ }
+
+ @Override
+ public String toXml() {
+ return doc.toXml();
+ }
+
+ @Override
+ public void printXml(XmlStream xml) {
+ doc.printXml(xml);
+ }
+
+ @Override
+ public void onSerialize(Serializer target) throws SerializationException {
+ doc.onSerialize(target);
+ }
+
+ @Override
+ public void serializeHeader(Serializer target) throws SerializationException {
+ doc.serializeHeader(target);
+ }
+
+ @Override
+ public void serializeBody(Serializer target) throws SerializationException {
+ doc.serializeBody(target);
+ }
+
+ @Override
+ public DocumentType getDataType() {
+ return doc.getDataType();
+ }
+
+ @Override
+ public int getFieldCount() {
+ return doc.getFieldCount();
+ }
+
+ @Override
+ public void serialize(DocumentWriter writer) {
+ doc.serialize(writer);
+ }
+
+ @Override
+ public void deserialize(DocumentReader reader) {
+ doc.deserialize(reader);
+ }
+
+ @Override
+ public void serialize(Field field, FieldWriter writer) {
+ doc.serialize(field, writer);
+ }
+
+ @Override
+ public void deserialize(Field field, FieldReader reader) {
+ doc.deserialize(field, reader);
+ }
+
+ @Override
+ public int compareTo(FieldValue fieldValue) {
+ return super.compareTo(fieldValue);
+ }
+
+ @Override
+ public ModificationStatus iterateNested(FieldPath fieldPath, int pos,
+ FieldPathIteratorHandler handler) {
+ return doc.iterateNested(fieldPath, pos, handler);
+ }
+
+ private static class RemoveHandler extends FieldPathIteratorHandler {
+ private boolean doModifyCalled = false;
+ private FieldValue prevVal;
+
+ @Override
+ public boolean onComplex(FieldValue fieldVal) {
+ return false;
+ }
+
+ @Override
+ public ModificationStatus doModify(FieldValue fieldVal) {
+ doModifyCalled = true;
+ prevVal = fieldVal.clone();
+ return ModificationStatus.REMOVED;
+ }
+ }
+
+ /**
+ * The {@link Document} which this proxies
+ * @return The proxied Document
+ */
+ public Document getDocument() {
+ return doc;
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java
new file mode 100644
index 00000000000..5892fc349e2
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/proxy/ProxyDocumentUpdate.java
@@ -0,0 +1,119 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.proxy;
+import com.yahoo.docproc.DocumentOperationWrapper;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.Field;
+import com.yahoo.document.serialization.DocumentUpdateWriter;
+import com.yahoo.document.update.FieldUpdate;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Schema mapped facade to a DocumentUpdate
+ * @author vegardh
+ *
+ */
+public class ProxyDocumentUpdate extends DocumentUpdate implements DocumentOperationWrapper {
+
+ private DocumentUpdate docU;
+ /**
+ * The field name map for schema mapping. The key is the field name that the docproc uses. The value is the actual name of the field
+ * in the document.
+ */
+ private Map<String, String> fieldMap;
+
+ public ProxyDocumentUpdate(DocumentUpdate docUpd, Map<String, String> fieldMap) {
+ super(docUpd.getType(), docUpd.getId().toString()+"-schemamappedupdate");
+ this.docU=docUpd;
+ this.fieldMap=fieldMap;
+ }
+
+ @Override
+ public DocumentType getDocumentType() {
+ return docU.getDocumentType();
+ }
+
+ @Override
+ public FieldUpdate getFieldUpdate(Field field) {
+ return getFieldUpdate(field.getName());
+ }
+
+ @Override
+ public FieldUpdate getFieldUpdate(int index) {
+ return docU.getFieldUpdate(index);
+ }
+
+ @Override
+ public FieldUpdate getFieldUpdate(String fieldName) {
+ String mapped = fieldMap.get(fieldName);
+ if (mapped==null) {
+ return docU.getFieldUpdate(fieldName);
+ }
+ // TODO how about structs here?
+ return docU.getFieldUpdate(mapped);
+ }
+
+ @Override
+ public List<FieldUpdate> getFieldUpdates() {
+ return docU.getFieldUpdates();
+ }
+
+ @Override
+ public DocumentId getId() {
+ return docU.getId();
+ }
+
+ @Override
+ public DocumentType getType() {
+ return docU.getType();
+ }
+
+ @Override
+ public DocumentUpdate addFieldUpdate(FieldUpdate fieldUpdate) {
+ return docU.addFieldUpdate(fieldUpdate);
+ }
+
+ @Override
+ public DocumentUpdate applyTo(Document doc) {
+ return docU.applyTo(doc);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return docU.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return docU.hashCode();
+ }
+
+ @Override
+ public void serialize(DocumentUpdateWriter data) {
+ docU.serialize(data);
+ }
+
+ @Override
+ public int size() {
+ return docU.size();
+ }
+
+ @Override
+ public String toString() {
+ return docU.toString();
+ }
+
+ @Override
+ public DocumentOperation getWrappedDocumentOperation() {
+ DocumentOperation innermostDocOp = docU;
+ while (innermostDocOp instanceof DocumentOperationWrapper) {
+ innermostDocOp = ((DocumentOperationWrapper) innermostDocOp).getWrappedDocumentOperation();
+ }
+ return innermostDocOp;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java b/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java
new file mode 100644
index 00000000000..b27c8b725f2
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/proxy/SchemaMap.java
@@ -0,0 +1,131 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.proxy;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.config.docproc.SchemamappingConfig;
+import com.yahoo.config.docproc.SchemamappingConfig.Fieldmapping;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.logging.Logger;
+
+/**
+ * Can be used to map field names from input doc into names used in a docproc that was
+ * written with generic field names.
+ * @author vegardh
+ *
+ */
+public class SchemaMap implements ConfigSubscriber.SingleSubscriber<SchemamappingConfig> {
+ /**
+ * Map key. Doctype can be null, not the others.
+ * @author vegardh
+ *
+ */
+ class SchemaMapKey {
+
+ private final String chain;
+ private final String docproc;
+ private final String doctype;
+ private final String inDocument;
+
+ public SchemaMapKey(String chain, String docproc, String doctype, String from) {
+ this.chain = chain;
+ this.docproc = docproc;
+ this.doctype = doctype;
+ this.inDocument = from;
+ if (chain==null) throw new IllegalArgumentException("'chain' cannot be null in schema map.");
+ if (docproc==null) throw new IllegalArgumentException("'docproc' cannot be null in schema map.");
+ if (from==null) throw new IllegalArgumentException("'from' cannot be null in schema map.");
+ }
+ public String getChain() {
+ return chain;
+ }
+ public String getDocproc() {
+ return docproc;
+ }
+ public String getDoctype() {
+ return doctype;
+ }
+ public String getInDocument() {
+ return inDocument;
+ }
+
+ private boolean equalType(SchemaMapKey other) {
+ if (doctype==null) return other.getDoctype()==null;
+ return doctype.equals(other.getDoctype());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SchemaMapKey)) return false;
+ SchemaMapKey other = (SchemaMapKey)obj;
+ return other.getChain().equals(chain) &&
+ other.getDocproc().equals(docproc) &&
+ other.getInDocument().equals(inDocument) &&
+ equalType(other);
+ }
+ @Override
+ public int hashCode() {
+ return chain.hashCode()+docproc.hashCode()+(doctype!=null?doctype.hashCode():0)+inDocument.hashCode();
+ }
+
+ }
+
+ // (key->inProcessor),...
+ private final ConfigSubscriber subscriber;
+ private Map<SchemaMapKey, String> fields = new HashMap<>();
+
+ void addMapping(String chain, String docproc, String doctype, String inDocument, String inProcessor) {
+ fields.put(new SchemaMapKey(chain, docproc, doctype, inDocument), inProcessor);
+ }
+
+ /**
+ * New map from the docproc cluster's config id
+ * @param configid can be null. Will not get anything from config in that case.
+ */
+ public SchemaMap(String configid) {
+ subscriber = new ConfigSubscriber();
+ if (configid!=null) {
+ subscriber.subscribe(this, SchemamappingConfig.class, configid);
+ }
+ }
+
+ public SchemaMap() {
+ this(null);
+ }
+
+ @Override
+ public void configure(SchemamappingConfig config) {
+ if (config == null) {
+ return;
+ }
+ fields.clear();
+ for (Fieldmapping m: config.fieldmapping()) {
+ SchemaMapKey key = new SchemaMapKey(m.chain(), m.docproc(), ("".equals(m.doctype())?null:m.doctype()), m.indocument());
+ fields.put(key, m.inprocessor());
+ }
+ }
+
+ /**
+ * The map for a given chain,docproc:
+ * "Reverses" the direction, this is the mapping a docproc should do when a
+ * doc comes in. The doctype is null if not given in map.
+ *
+ * @return (doctype,inProcessor)→inDocument
+ */
+ public Map<Pair<String,String>, String> chainMap(String chain, String docproc) {
+ Map<Pair<String, String>, String> ret = new HashMap<>();
+ for (Entry<SchemaMapKey, String> e : fields.entrySet()) {
+ SchemaMapKey key = e.getKey();
+ if (key.getChain().equals(chain) && key.getDocproc().equals(docproc)) {
+ // Reverse direction here
+ ret.put(new Pair<>(key.getDoctype(),e.getValue()), key.getInDocument());
+ }
+ }
+ return ret;
+ }
+
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java
new file mode 100644
index 00000000000..f934d64736f
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/util/JoinerDocumentProcessor.java
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.util;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.document.datatypes.Array;
+import com.yahoo.log.LogLevel;
+
+import java.util.logging.Logger;
+
+import static com.yahoo.docproc.util.SplitterDocumentProcessor.validate;
+import static com.yahoo.docproc.util.SplitterDocumentProcessor.doProcessOuterDocument;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class JoinerDocumentProcessor extends DocumentProcessor {
+
+ private static Logger log = Logger.getLogger(JoinerDocumentProcessor.class.getName());
+ private String documentTypeName;
+ private String arrayFieldName;
+ private String contextFieldName;
+ DocumentTypeManager manager;
+
+ public JoinerDocumentProcessor(SplitterJoinerDocumentProcessorConfig cfg, DocumentmanagerConfig documentmanagerConfig) {
+ super();
+ this.documentTypeName = cfg.documentTypeName();
+ this.arrayFieldName = cfg.arrayFieldName();
+ this.contextFieldName = cfg.contextFieldName();
+ manager = DocumentTypeManagerConfigurer.configureNewManager(documentmanagerConfig);
+ validate(manager, documentTypeName, arrayFieldName);
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ if ( ! doProcessOuterDocument(processing.getVariable(contextFieldName), documentTypeName)) {
+ return Progress.DONE;
+ }
+
+ DocumentPut outerDoc = (DocumentPut)processing.getVariable(contextFieldName);
+
+ Array<Document> innerDocuments = (Array<Document>) outerDoc.getDocument().getFieldValue(arrayFieldName);
+
+ if (innerDocuments == null) {
+ innerDocuments = (Array<Document>) outerDoc.getDocument().getDataType().getField(arrayFieldName).getDataType().createFieldValue();
+ }
+
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op instanceof DocumentPut) {
+ innerDocuments.add(((DocumentPut)op).getDocument());
+ } else {
+ log.log(LogLevel.DEBUG, "Skipping: " + op);
+ }
+ }
+ processing.getDocumentOperations().clear();
+ processing.getDocumentOperations().add(outerDoc);
+ processing.removeVariable(contextFieldName);
+ return Progress.DONE;
+ }
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java b/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java
new file mode 100644
index 00000000000..ed45435d6ef
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/util/SplitterDocumentProcessor.java
@@ -0,0 +1,147 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.util;
+
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.ArrayDataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.document.datatypes.Array;
+import com.yahoo.log.LogLevel;
+
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class SplitterDocumentProcessor extends DocumentProcessor {
+
+ private static Logger log = Logger.getLogger(SplitterDocumentProcessor.class.getName());
+ private String documentTypeName;
+ private String arrayFieldName;
+ private String contextFieldName;
+ DocumentTypeManager manager;
+
+ public SplitterDocumentProcessor(SplitterJoinerDocumentProcessorConfig cfg, DocumentmanagerConfig documentmanagerConfig) {
+ super();
+ this.documentTypeName = cfg.documentTypeName();
+ this.arrayFieldName = cfg.arrayFieldName();
+ this.contextFieldName = cfg.contextFieldName();
+ this.manager = DocumentTypeManagerConfigurer.configureNewManager(documentmanagerConfig);
+ validate(manager, documentTypeName, arrayFieldName);
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ if (processing.getDocumentOperations().size() != 1) {
+ //we were given more than one document, return
+ log.log(LogLevel.DEBUG, "More than one document given, returning. (Was given "
+ + processing.getDocumentOperations().size() + " documents).");
+ return Progress.DONE;
+ }
+
+ if (!doProcessOuterDocument(processing.getDocumentOperations().get(0), documentTypeName)) {
+ return Progress.DONE;
+ }
+
+ Document outerDoc = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();;
+
+ Array<Document> innerDocuments = (Array<Document>) outerDoc.getFieldValue(arrayFieldName);
+ if (innerDocuments == null) {
+ //the document does not have the field, return
+ log.log(LogLevel.DEBUG, "The given Document does not have a field value for field "
+ + arrayFieldName + ", returning. (Was given " + outerDoc + ").");
+ return Progress.DONE;
+ }
+
+ if (innerDocuments.size() == 0) {
+ //the array is empty, return
+ log.log(LogLevel.DEBUG, "The given Document does not have any elements in array field "
+ + arrayFieldName + ", returning. (Was given " + outerDoc + ").");
+ return Progress.DONE;
+ }
+
+ split(processing, innerDocuments);
+ return Progress.DONE;
+ }
+
+ private void split(Processing processing, Array<Document> innerDocuments) {
+ processing.setVariable(contextFieldName, processing.getDocumentOperations().get(0));
+ processing.getDocumentOperations().clear();
+ processing.getDocumentOperations().addAll(innerDocuments.stream()
+ .map(DocumentPut::new)
+ .collect(Collectors.toList()));
+
+ innerDocuments.clear();
+ }
+
+
+ static void validate(DocumentTypeManager manager, String documentTypeName, String arrayFieldName) {
+ DocumentType docType = manager.getDocumentType(documentTypeName);
+
+ if (docType == null) {
+ //the document type does not exist, return
+ throw new IllegalStateException("The document type " + documentTypeName + " is not deployed.");
+ }
+
+ if (docType.getField(arrayFieldName) == null) {
+ //the document type does not have the field, return
+ throw new IllegalStateException("The document type " + documentTypeName
+ + " does not have a field named " + arrayFieldName + ".");
+ }
+
+ if (!(docType.getField(arrayFieldName).getDataType() instanceof ArrayDataType)) {
+ //the data type of the field is wrong, return
+ throw new IllegalStateException("The data type of the field named "
+ + arrayFieldName + " in document type " + documentTypeName
+ + " is not an array type");
+ }
+
+ ArrayDataType fieldDataType = (ArrayDataType) docType.getField(arrayFieldName).getDataType();
+
+ if (!(fieldDataType.getNestedType() instanceof DocumentType)) {
+ //the subtype of tye array data type of the field is wrong, return
+ throw new IllegalStateException("The data type of the field named "
+ + arrayFieldName + " in document type " + documentTypeName
+ + " is not an array of Document.");
+ }
+ }
+
+ static boolean doProcessOuterDocument(Object o, String documentTypeName) {
+ if ( ! (o instanceof DocumentOperation)) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, o + " is not a DocumentOperation.");
+ }
+ return false;
+ }
+
+ DocumentOperation outerDocOp = (DocumentOperation)o;
+ if ( ! (outerDocOp instanceof DocumentPut)) {
+ //this is not a put, return
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Given DocumentOperation is not a DocumentPut, returning. (Was given "
+ + outerDocOp + ").");
+ }
+ return false;
+ }
+
+ Document outerDoc = ((DocumentPut) outerDocOp).getDocument();
+ DocumentType type = outerDoc.getDataType();
+ if (!type.getName().equalsIgnoreCase(documentTypeName)) {
+ //this is not the right document type
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, "Given Document is of wrong type, returning. (Was given " + outerDoc + ").");
+ }
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/docproc/src/main/java/com/yahoo/docproc/util/package-info.java b/docproc/src/main/java/com/yahoo/docproc/util/package-info.java
new file mode 100644
index 00000000000..3becb4f2d15
--- /dev/null
+++ b/docproc/src/main/java/com/yahoo/docproc/util/package-info.java
@@ -0,0 +1,7 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi
+package com.yahoo.docproc.util;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/docproc/src/main/resources/configdefinitions/docproc.def b/docproc/src/main/resources/configdefinitions/docproc.def
new file mode 100644
index 00000000000..a3e89c22226
--- /dev/null
+++ b/docproc/src/main/resources/configdefinitions/docproc.def
@@ -0,0 +1,12 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=12
+namespace=config.docproc
+
+# Queue size (in milliseconds) for this node
+# Positive size gives a ThroughPutLimitQueue. ### Experimental.
+# 0 Gives a PriorityQueue
+# Negative values gives a ordinary LinkedBlockingQueue. This is fastest and most efficient.
+maxqueuetimems int default=-1
+
+#The number of threads in the DocprocHandler worker thread pool
+numthreads int default=-1
diff --git a/docproc/src/main/resources/configdefinitions/schemamapping.def b/docproc/src/main/resources/configdefinitions/schemamapping.def
new file mode 100644
index 00000000000..a8e0243afac
--- /dev/null
+++ b/docproc/src/main/resources/configdefinitions/schemamapping.def
@@ -0,0 +1,20 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+# Use when a docproc works on a generic set of field names and the actual names
+# in input doc may be different
+version=1
+namespace=config.docproc
+
+# The chain this mapping applies to
+fieldmapping[].chain string
+
+# The class name of the docproc this mapping applies to
+fieldmapping[].docproc string
+
+#The doc type name this mapping applies to
+fieldmapping[].doctype string default=""
+
+# Field name in input doc
+fieldmapping[].indocument string
+
+# Field name a docproc uses
+fieldmapping[].inprocessor string
diff --git a/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def b/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def
new file mode 100644
index 00000000000..73e84578ed9
--- /dev/null
+++ b/docproc/src/main/resources/configdefinitions/splitter-joiner-document-processor.def
@@ -0,0 +1,12 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=2
+namespace=config.docproc
+
+# The name of the enclosing (outer) document type
+documentTypeName string
+
+# The name of the field of type array of document
+arrayFieldName string
+
+# The name of the context variable used by Processing.getVariable()
+contextFieldName string default="docproc@splitter@joiner@outer@document"
diff --git a/docproc/src/test/cfg/docproc-chain-arguments.cfg b/docproc/src/test/cfg/docproc-chain-arguments.cfg
new file mode 100644
index 00000000000..bbd66d7591d
--- /dev/null
+++ b/docproc/src/test/cfg/docproc-chain-arguments.cfg
@@ -0,0 +1,20 @@
+docprocchain[1]
+docprocchain[0].name "chain-arguments"
+docprocchain[0].processor[3]
+
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor1"
+docprocchain[0].processor[0].argument[2]
+docprocchain[0].processor[0].argument[0].name arg1
+docprocchain[0].processor[0].argument[0].value val1
+docprocchain[0].processor[0].argument[1].name arg2
+docprocchain[0].processor[0].argument[1].value val2
+
+docprocchain[0].processor[1].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor2"
+docprocchain[0].processor[1].argument[1]
+docprocchain[0].processor[1].argument[0].name arg1
+docprocchain[0].processor[1].argument[0].value val3
+
+docprocchain[0].processor[2].classname "com.yahoo.docproc.configuration.test.DocprocServiceArgumentConfigurationTestCase$TestDocumentProcessor3"
+docprocchain[0].processor[2].argument[1]
+docprocchain[0].processor[2].argument[0].name arg2
+docprocchain[0].processor[2].argument[0].value val4
diff --git a/docproc/src/test/cfg/docproc-chain-empty.cfg b/docproc/src/test/cfg/docproc-chain-empty.cfg
new file mode 100644
index 00000000000..091ed109913
--- /dev/null
+++ b/docproc/src/test/cfg/docproc-chain-empty.cfg
@@ -0,0 +1,6 @@
+docprocchain[2]
+docprocchain[0].name "baloo"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor1"
+docprocchain[1].name "badoo"
+docprocchain[1].processor[0]
diff --git a/docproc/src/test/cfg/docproc-chain-parameters.cfg b/docproc/src/test/cfg/docproc-chain-parameters.cfg
new file mode 100644
index 00000000000..0780030bd3b
--- /dev/null
+++ b/docproc/src/test/cfg/docproc-chain-parameters.cfg
@@ -0,0 +1,24 @@
+docprocchain[1]
+docprocchain[0].name "parameters"
+docprocchain[0].processor[3]
+
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor1"
+docprocchain[0].processor[0].parameter[4]
+docprocchain[0].processor[0].parameter[0].name param1
+docprocchain[0].processor[0].parameter[0].value val1
+docprocchain[0].processor[0].parameter[1].name param1
+docprocchain[0].processor[0].parameter[1].value val2
+docprocchain[0].processor[0].parameter[2].name param1
+docprocchain[0].processor[0].parameter[2].value val3
+docprocchain[0].processor[0].parameter[3].name param2
+docprocchain[0].processor[0].parameter[3].value val0
+
+docprocchain[0].processor[1].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor2"
+docprocchain[0].processor[1].parameter[1]
+docprocchain[0].processor[1].parameter[0].name param1
+docprocchain[0].processor[1].parameter[0].value val3
+
+docprocchain[0].processor[2].classname "com.yahoo.docproc.configuration.test.DocprocServiceParametersConfigurationTestCase$TestDocumentProcessor3"
+docprocchain[0].processor[2].parameter[1]
+docprocchain[0].processor[2].parameter[0].name param2
+docprocchain[0].processor[2].parameter[0].value val4
diff --git a/docproc/src/test/cfg/docproc-chain-slow.cfg b/docproc/src/test/cfg/docproc-chain-slow.cfg
new file mode 100644
index 00000000000..27cedd9407c
--- /dev/null
+++ b/docproc/src/test/cfg/docproc-chain-slow.cfg
@@ -0,0 +1,4 @@
+docprocchain[1]
+docprocchain[0].name "slow"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.DocprocServiceConfigurationTestCase$SlowDocumentProcessor"
diff --git a/docproc/src/test/cfg/docproc-chain.cfg b/docproc/src/test/cfg/docproc-chain.cfg
new file mode 100644
index 00000000000..ed40e889004
--- /dev/null
+++ b/docproc/src/test/cfg/docproc-chain.cfg
@@ -0,0 +1,5 @@
+docprocchain[1]
+docprocchain[0].processor[3]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor1"
+docprocchain[0].processor[1].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor2"
+docprocchain[0].processor[2].classname "com.yahoo.docproc.test.DocumentProcessingAbstractTestCase$TestDocumentProcessor3"
diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg
new file mode 100644
index 00000000000..21b6b2e4e57
--- /dev/null
+++ b/docproc/src/test/cfg/invalid/docproc-chain-nomapconstructor.cfg
@@ -0,0 +1,7 @@
+docprocchain[1]
+docprocchain[0].name "nomapconstructor"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoMapConstructorDocumentProcessor"
+docprocchain[0].processor[0].parameter[1]
+docprocchain[0].processor[0].parameter[0].name "dis"
+docprocchain[0].processor[0].parameter[0].value "dat"
diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg
new file mode 100644
index 00000000000..c710a4726a7
--- /dev/null
+++ b/docproc/src/test/cfg/invalid/docproc-chain-nopublicconstructor.cfg
@@ -0,0 +1,4 @@
+docprocchain[1]
+docprocchain[0].name "nopublicconstructor"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoPublicConstructorDocumentProcessor"
diff --git a/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg
new file mode 100644
index 00000000000..002210daf1f
--- /dev/null
+++ b/docproc/src/test/cfg/invalid/docproc-chain-nostringconstructor.cfg
@@ -0,0 +1,4 @@
+docprocchain[1]
+docprocchain[0].name "nostringconstructor"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$NoStringConstructorDocumentProcessor"
diff --git a/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg b/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg
new file mode 100644
index 00000000000..b6a2a71d87f
--- /dev/null
+++ b/docproc/src/test/cfg/invalid/docproc-chain-ok.cfg
@@ -0,0 +1,4 @@
+docprocchain[1]
+docprocchain[0].name "ok"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$OkDocumentProcessor"
diff --git a/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg b/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg
new file mode 100644
index 00000000000..a8296c14d20
--- /dev/null
+++ b/docproc/src/test/cfg/invalid/docproc-chain-throwingexceptioninconstructor.cfg
@@ -0,0 +1,4 @@
+docprocchain[1]
+docprocchain[0].name "throwingexcaptioninconstructor"
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.configuration.test.InvalidChainTestCase$ThrowingExceptionInConstructorDocumentProcessor"
diff --git a/docproc/src/test/cfg/messagebus/docproc-chain.cfg b/docproc/src/test/cfg/messagebus/docproc-chain.cfg
new file mode 100644
index 00000000000..8496ebfb05c
--- /dev/null
+++ b/docproc/src/test/cfg/messagebus/docproc-chain.cfg
@@ -0,0 +1,2 @@
+docprocchain[1]
+docprocchain[0].processor[0]
diff --git a/docproc/src/test/cfg/messagebus/docproc.cfg b/docproc/src/test/cfg/messagebus/docproc.cfg
new file mode 100644
index 00000000000..ef67a6993b0
--- /dev/null
+++ b/docproc/src/test/cfg/messagebus/docproc.cfg
@@ -0,0 +1,4 @@
+rpcserver.enabled false
+rpcserver.port 6985
+slobrok.enabled false
+slobrok.servicename "dpcluster.1/docproc/78"
diff --git a/docproc/src/test/cfg/messagebus/documentmanager.cfg b/docproc/src/test/cfg/messagebus/documentmanager.cfg
new file mode 100644
index 00000000000..36cf88c8384
--- /dev/null
+++ b/docproc/src/test/cfg/messagebus/documentmanager.cfg
@@ -0,0 +1,36 @@
+datatype[3]
+datatype[0].id 306916075
+datatype[0].arraytype[0]
+datatype[0].weightedsettype[0]
+datatype[0].structtype[1]
+datatype[0].structtype[0].name test.header
+datatype[0].structtype[0].version 0
+datatype[0].structtype[0].field[3]
+datatype[0].structtype[0].field[0].name test
+datatype[0].structtype[0].field[0].id[0]
+datatype[0].structtype[0].field[0].datatype 2
+datatype[0].structtype[0].field[1].name touched
+datatype[0].structtype[0].field[1].id[0]
+datatype[0].structtype[0].field[1].datatype 2
+datatype[0].structtype[0].field[2].name docno
+datatype[0].structtype[0].field[2].id[0]
+datatype[0].structtype[0].field[2].datatype 0
+datatype[0].documenttype[0]
+datatype[1].id -1270491200
+datatype[1].arraytype[0]
+datatype[1].weightedsettype[0]
+datatype[1].structtype[1]
+datatype[1].structtype[0].name test.body
+datatype[1].structtype[0].version 0
+datatype[1].structtype[0].field[0]
+datatype[1].documenttype[0]
+datatype[2].id -877171244
+datatype[2].arraytype[0]
+datatype[2].weightedsettype[0]
+datatype[2].structtype[0]
+datatype[2].documenttype[1]
+datatype[2].documenttype[0].name test
+datatype[2].documenttype[0].version 0
+datatype[2].documenttype[0].inherits[0]
+datatype[2].documenttype[0].headerstruct 306916075
+datatype[2].documenttype[0].bodystruct -1270491200
diff --git a/docproc/src/test/cfg/server/docproc-chain.cfg b/docproc/src/test/cfg/server/docproc-chain.cfg
new file mode 100644
index 00000000000..0c7b14eb657
--- /dev/null
+++ b/docproc/src/test/cfg/server/docproc-chain.cfg
@@ -0,0 +1,3 @@
+docprocchain[1]
+docprocchain[0].processor[1]
+docprocchain[0].processor[0].classname "com.yahoo.docproc.server.test.ServerTestCase$BalooDocumentProcessor"
diff --git a/docproc/src/test/cfg/server/docproc.cfg b/docproc/src/test/cfg/server/docproc.cfg
new file mode 100644
index 00000000000..602f6fc4f10
--- /dev/null
+++ b/docproc/src/test/cfg/server/docproc.cfg
@@ -0,0 +1,4 @@
+rpcserver.enabled true
+rpcserver.port 6985
+slobrok.enabled false
+slobrok.servicename dpcluster.1/docproc/78
diff --git a/docproc/src/test/cfg/server/documentmanager.cfg b/docproc/src/test/cfg/server/documentmanager.cfg
new file mode 100644
index 00000000000..b3e360e961f
--- /dev/null
+++ b/docproc/src/test/cfg/server/documentmanager.cfg
@@ -0,0 +1,36 @@
+datatype[3]
+datatype[0].id 306916075
+datatype[0].arraytype[0]
+datatype[0].weightedsettype[0]
+datatype[0].structtype[1]
+datatype[0].structtype[0].name test.header
+datatype[0].structtype[0].version 0
+datatype[0].structtype[0].field[2]
+datatype[0].structtype[0].field[0].name test
+datatype[0].structtype[0].field[0].id[0]
+datatype[0].structtype[0].field[0].datatype 2
+datatype[0].structtype[0].field[1].name touched
+datatype[0].structtype[0].field[1].id[0]
+datatype[0].structtype[0].field[1].datatype 2
+datatype[0].documenttype[0]
+datatype[1].id -1270491200
+datatype[1].arraytype[0]
+datatype[1].weightedsettype[0]
+datatype[1].structtype[1]
+datatype[1].structtype[0].name test.body
+datatype[1].structtype[0].version 0
+datatype[1].structtype[0].field[1]
+datatype[1].structtype[0].field[0].name docno
+datatype[1].structtype[0].field[0].id[0]
+datatype[1].structtype[0].field[0].datatype 0
+datatype[1].documenttype[0]
+datatype[2].id -877171244
+datatype[2].arraytype[0]
+datatype[2].weightedsettype[0]
+datatype[2].structtype[0]
+datatype[2].documenttype[1]
+datatype[2].documenttype[0].name test
+datatype[2].documenttype[0].version 0
+datatype[2].documenttype[0].inherits[0]
+datatype[2].documenttype[0].headerstruct 306916075
+datatype[2].documenttype[0].bodystruct -1270491200
diff --git a/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java
new file mode 100644
index 00000000000..fedc0eb21f5
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/AccessesAnnotationTestCase.java
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.IntegerFieldValue;
+import com.yahoo.document.datatypes.StringFieldValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class AccessesAnnotationTestCase {
+
+ @Test
+ public void requireThatFieldsAreRestricted() {
+ DocumentType type = new DocumentType("album");
+ type.addField("title", DataType.STRING);
+ type.addField("artist", DataType.STRING);
+ type.addField("year", DataType.INT);
+ Document doc = new Document(type, new DocumentId("doc:map:test:1"));
+
+ MyDocProc docProc = new MyDocProc();
+ DocumentPut put = new DocumentPut(doc);
+ Document proxy = new Call(docProc).configDoc(docProc, put).getDocument();
+ proxy.setFieldValue("title", new StringFieldValue("foo"));
+ try {
+ proxy.setFieldValue("year", new IntegerFieldValue(69));
+ fail("Should have failed");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ assertTrue(e.getMessage().matches(".*not allowed.*"));
+ }
+
+ proxy.getFieldValue("title");
+ try {
+ proxy.getFieldValue("year");
+ fail("Should have failed");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ assertTrue(e.getMessage().matches(".*not allowed.*"));
+ }
+ }
+
+ @Accesses({
+ @Accesses.Field(name = "title", dataType = "string", description = "What is done on field title",
+ annotations = @Accesses.Field.Tree(produces = { "sentences" })),
+ @Accesses.Field(name = "artist", dataType = "string", description = "What is done on field artist",
+ annotations = {
+ @Accesses.Field.Tree(name = "root", produces = { "sentences" }),
+ @Accesses.Field.Tree(name = "root2", consumes = { "places" })
+ }),
+ @Accesses.Field(name = "track", dataType = "string", description = "What is done on field track")
+ })
+ class MyDocProc extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ return null;
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java b/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java
new file mode 100644
index 00000000000..8ef832faf31
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/CallStackTestCase.java
@@ -0,0 +1,291 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import java.util.Iterator;
+
+/**
+ * Tests call stacks
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ */
+public class CallStackTestCase extends junit.framework.TestCase {
+
+ private CallStack callStack, insertStack;
+
+ private Call call1, call2, call3, noCall, newCall, insert1, insert2, insert3;
+
+ private DocumentProcessor processor2, noProcessor;
+
+ public CallStackTestCase(String name) {
+ super(name);
+ }
+
+ protected void setUp() {
+ callStack = new CallStack();
+ call1 = new Call(new TestDocumentProcessor());
+ processor2 = new TestDocumentProcessor();
+ call2 = new Call(processor2);
+ call3 = new Call(new TestDocumentProcessor());
+ callStack.addLast(call1).addLast(call2).addLast(call3);
+ noProcessor = new TestDocumentProcessor();
+ noCall = new Call(noProcessor);
+ newCall = new Call(new TestDocumentProcessor());
+
+ insert1 = new Call(new TestDocumentProcessor());
+ insert2 = new Call(new TestDocumentProcessor());
+ insert3 = new Call(new TestDocumentProcessor());
+ insertStack = new CallStack();
+ insertStack.addLast(insert1).addLast(insert2).addLast(insert3);
+ }
+
+ public void testFind() {
+ assertSame(call2, callStack.findCall(processor2));
+ assertSame(call2, callStack.findCall(processor2.getId()));
+ assertNull(callStack.findCall(noProcessor));
+ assertNull(callStack.findCall(noProcessor.getId()));
+ assertNull(callStack.findCall(new TestDocumentProcessor()));
+ }
+
+ public void testAddBefore() {
+ callStack.addBefore(call2, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(newCall, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackBefore() {
+ callStack.addBefore(call2, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddAfter() {
+ callStack.addAfter(call2, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(newCall, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackAfter() {
+ callStack.addAfter(call2, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddBeforeFirst() {
+ callStack.addBefore(call1, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(newCall, i.next());
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackBeforeFirst() {
+ callStack.addBefore(call1, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddAfterLast() {
+ callStack.addAfter(call3, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(newCall, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackAfterLast() {
+ callStack.addAfter(call3, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddBeforeNonExisting() {
+ callStack.addBefore(noCall, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(newCall, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackBeforeNonExisting() {
+ callStack.addBefore(noCall, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddAfterNonExisting() {
+ callStack.addAfter(noCall, newCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(newCall, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testAddStackAfterNonExisting() {
+ callStack.addAfter(noCall, insertStack);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertEquals(insert1, i.next());
+ assertEquals(insert2, i.next());
+ assertEquals(insert3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testRemove() {
+ callStack.remove(call1);
+ Iterator i = callStack.iterator();
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testRemoveNonExisting() {
+ callStack.remove(noCall);
+ Iterator i = callStack.iterator();
+ assertEquals(call1, i.next());
+ assertEquals(call2, i.next());
+ assertEquals(call3, i.next());
+ assertFalse(i.hasNext());
+ }
+
+ public void testContains() {
+ callStack.addLast(newCall);
+ assertTrue(callStack.contains(call1));
+ assertTrue(callStack.contains(call2));
+ assertTrue(callStack.contains(call3));
+ assertTrue(callStack.contains(newCall));
+ assertFalse(callStack.contains(noCall));
+ }
+
+ public void testPop() {
+ assertEquals(call1, callStack.pop());
+ assertEquals(call2, callStack.pop());
+ callStack.addNext(newCall);
+
+ assertFalse(callStack.contains(call1));
+ assertFalse(callStack.contains(call2));
+ assertTrue(callStack.contains(call3));
+ assertTrue(callStack.contains(newCall));
+
+ assertEquals(newCall, callStack.pop());
+ assertTrue(callStack.contains(call3));
+ assertFalse(callStack.contains(newCall));
+
+ assertEquals(call3, callStack.pop());
+ assertFalse(callStack.contains(call3));
+
+ assertNull(callStack.pop());
+ }
+
+ public void testGetLastPopped() {
+ CallStack stakk = new CallStack();
+ assertNull(stakk.getLastPopped());
+
+ Call call;
+ Call lastCall;
+
+ call = callStack.pop();
+ assertEquals(call1, call);
+ lastCall = callStack.getLastPopped();
+ assertEquals(call1, lastCall);
+ assertEquals(call, lastCall);
+
+ call = callStack.pop();
+ assertEquals(call2, call);
+ lastCall = callStack.getLastPopped();
+ assertEquals(call2, lastCall);
+ assertEquals(call, lastCall);
+
+ call = callStack.pop();
+ assertEquals(call3, call);
+ lastCall = callStack.getLastPopped();
+ assertEquals(call3, lastCall);
+ assertEquals(call, lastCall);
+
+ lastCall = callStack.getLastPopped();
+ assertEquals(call3, lastCall);
+ assertEquals(call, lastCall);
+
+ lastCall = callStack.getLastPopped();
+ assertEquals(call3, lastCall);
+ assertEquals(call, lastCall);
+
+ callStack.addLast(call1);
+ callStack.addLast(call2);
+
+ lastCall = callStack.getLastPopped();
+ assertEquals(call3, lastCall);
+ assertEquals(call, lastCall);
+
+ call = callStack.pop();
+ assertEquals(call1, call);
+ lastCall = callStack.getLastPopped();
+ assertEquals(call1, lastCall);
+ assertEquals(call, lastCall);
+
+ call = callStack.pop();
+ assertEquals(call2, call);
+ lastCall = callStack.getLastPopped();
+ assertEquals(call2, lastCall);
+ assertEquals(call, lastCall);
+
+ lastCall = callStack.getLastPopped();
+ assertEquals(call2, lastCall);
+ assertEquals(call, lastCall);
+
+ lastCall = callStack.getLastPopped();
+ assertEquals(call2, lastCall);
+ assertEquals(call, lastCall);
+ }
+
+ private static class TestDocumentProcessor extends com.yahoo.docproc.SimpleDocumentProcessor {
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java
new file mode 100644
index 00000000000..c51682b77c8
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/CallbackTestCase.java
@@ -0,0 +1,90 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.StringFieldValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class CallbackTestCase extends junit.framework.TestCase {
+ private DocumentPut put1;
+ private DocumentPut put2;
+ private List<DocumentOperation> operations = new ArrayList<>(2);
+ DocprocService service;
+
+
+ public void setUp() {
+ service = new DocprocService("callback");
+ service.setCallStack(new CallStack().addNext(new TestCallbackDp()));
+ service.setInService(true);
+
+ // Create documents
+ DocumentType type = new DocumentType("test");
+ type.addField("status", DataType.STRING);
+ put1 = new DocumentPut(type, new DocumentId("doc:callback:test:1"));
+ put2 = new DocumentPut(type, new DocumentId("doc:callback:test:2"));
+ operations.add(new DocumentPut(type, new DocumentId("doc:callback:test:3")));
+ operations.add(new DocumentPut(type, new DocumentId("doc:callback:test:4")));
+ }
+
+ public void testProcessingWithCallbackSingleDoc() {
+ ProcessingEndpoint drecv = new TestProcessingEndpoint();
+
+ service.process(put1, drecv);
+ while (service.doWork()) { }
+
+ assertEquals(new StringFieldValue("received"), put1.getDocument().getFieldValue("status"));
+ }
+
+ public void testProcessingWithCallbackMultipleDocs() {
+ ProcessingEndpoint drecv = new TestProcessingEndpoint();
+
+ service.process(toProcessing(operations), drecv);
+ while (service.doWork()) { }
+
+ assertEquals(new StringFieldValue("received"), ((DocumentPut) operations.get(0)).getDocument().getFieldValue("status"));
+ assertEquals(new StringFieldValue("received"), ((DocumentPut) operations.get(1)).getDocument().getFieldValue("status"));
+ }
+
+ private Processing toProcessing(List<DocumentOperation> documentOperations) {
+ Processing processing = new Processing();
+ for (DocumentOperation op : documentOperations)
+ processing.addDocumentOperation(op);
+ return processing;
+ }
+
+ public void testProcessingWithCallbackProcessing() {
+ ProcessingEndpoint drecv = new TestProcessingEndpoint();
+
+ Processing processing = new Processing("default", put2, service.getCallStack());
+
+ service.process(processing, drecv);
+ while (service.doWork()) { }
+
+ assertEquals(new StringFieldValue("received"), put2.getDocument().getFieldValue("status"));
+ }
+
+ public class TestProcessingEndpoint implements ProcessingEndpoint {
+ public void processingDone(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ ((DocumentPut)op).getDocument().setFieldValue("status", new StringFieldValue("received"));
+ }
+ }
+
+ public void processingFailed(Processing processing, Exception exception) {
+ //do nothing here for now
+ }
+ }
+
+ public static class TestCallbackDp extends com.yahoo.docproc.SimpleDocumentProcessor {
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java
new file mode 100644
index 00000000000..cce3460d3c4
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/DocumentProcessingAbstractTestCase.java
@@ -0,0 +1,86 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.StringFieldValue;
+
+/**
+ * Convenience superclass of document processor test cases
+ *
+ * @author bratseth
+ */
+public abstract class DocumentProcessingAbstractTestCase extends junit.framework.TestCase {
+
+ public DocumentProcessingAbstractTestCase(String name) {
+ super(name);
+ }
+
+ /**
+ * Asserts that a document processing service works
+ */
+ protected void assertProcessingWorks(DocprocService service) {
+ // Create documents
+ DocumentType type = new DocumentType("test");
+ type.addField("test", DataType.STRING);
+ DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:test:test:1"));
+ DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:test:test:2"));
+ DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:test:test:3"));
+
+ // Process them
+ service.process(put1);
+ service.process(put2);
+ service.process(put3);
+ while (service.doWork()) {}
+
+ // Verify
+ assertEquals(new StringFieldValue("done"), put1.getDocument().getFieldValue("test"));
+ assertEquals(new StringFieldValue("done"), put2.getDocument().getFieldValue("test"));
+ assertEquals(new StringFieldValue("done"), put3.getDocument().getFieldValue("test"));
+ }
+
+ public static class TestDocumentProcessor1 extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (processing.getVariable("processor1") == null) {
+ processing.setVariable("processor1", "called");
+ return Progress.LATER;
+ }
+ processing.setVariable("processor1", "calledTwice");
+ }
+ return Progress.DONE;
+ }
+ }
+
+ public static class TestDocumentProcessor2 extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ assertEquals("calledTwice", processing.getVariable("processor1"));
+ processing.setVariable("processor2", "called");
+ }
+ return Progress.DONE;
+ }
+ }
+
+ public static class TestDocumentProcessor3 extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ assertEquals("called", processing.getVariable("processor2"));
+ if (processing.getVariable("processor3") == null) {
+ processing.setVariable("processor3", "called");
+ return Progress.LATER;
+ }
+ ((DocumentPut)op).getDocument().setFieldValue("test", new StringFieldValue("done"));
+ }
+ return Progress.DONE;
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java
new file mode 100644
index 00000000000..060f3a9747c
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/EmptyProcessingTestCase.java
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import org.junit.Test;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class EmptyProcessingTestCase {
+
+ @Test
+ public void emptyProcessing() {
+ DocprocService service = new DocprocService("juba");
+ DocumentProcessor processor = new IncrementingDocumentProcessor();
+ CallStack stack = new CallStack("juba");
+ stack.addLast(processor);
+ service.setCallStack(stack);
+ service.setInService(true);
+
+ Processing proc = new Processing();
+ proc.setServiceName("juba");
+
+ service.process(proc);
+
+ while (service.doWork()) { }
+
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java
new file mode 100644
index 00000000000..1b7a005350e
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingTestCase.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.StringFieldValue;
+
+/**
+ * Tests a document processing where some processings fail with an exception
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ */
+public class FailingDocumentProcessingTestCase extends junit.framework.TestCase {
+
+ public FailingDocumentProcessingTestCase(String name) {
+ super(name);
+ }
+
+ /**
+ * Tests chaining of some processors, and execution of the processors
+ * on some documents
+ */
+ public void testFailingProcessing() {
+ // Set up service programmatically
+ DocprocService service = new DocprocService("failing");
+ DocumentProcessor first = new SettingValueProcessor("done 1");
+ DocumentProcessor second = new FailingProcessor("done 2");
+ DocumentProcessor third = new SettingValueProcessor("done 3");
+ service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third));
+ service.setInService(true);
+
+ assertProcessingWorks(service);
+ }
+
+ protected void assertProcessingWorks(DocprocService service) {
+ // Create documents
+ DocumentType type = new DocumentType("test");
+ type.addField("test", DataType.STRING);
+ DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:failing:test:1"));
+ DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:failing:test:2"));
+ DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:failing:test:3"));
+
+ // Process them
+ service.process(put1);
+ service.process(put2);
+ service.process(put3);
+ while (service.doWork()) {}
+
+ // Verify
+ assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test"));
+ assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to exception in 2
+ assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test"));
+ }
+
+ public static class SettingValueProcessor extends SimpleDocumentProcessor {
+
+ private String value;
+
+ public SettingValueProcessor(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public void process(DocumentPut put) {
+ put.getDocument().setFieldValue("test", value);
+ }
+ }
+
+ public static class FailingProcessor extends SettingValueProcessor {
+
+ public FailingProcessor(String name) {
+ super(name);
+ }
+
+ @Override
+ public void process(DocumentPut put) {
+ super.process(put);
+ if (put.getId().toString().equals("doc:failing:test:2")) {
+ throw new HandledProcessingException("Failed at receiving document test:2");
+ }
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java
new file mode 100644
index 00000000000..1db47b0dfb2
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/FailingDocumentProcessingWithoutExceptionTestCase.java
@@ -0,0 +1,94 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.StringFieldValue;
+
+/**
+ * Tests a document processing where some processings fail with an exception
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M. R. Rosenvinge</a>
+ */
+public class FailingDocumentProcessingWithoutExceptionTestCase extends junit.framework.TestCase {
+
+ public FailingDocumentProcessingWithoutExceptionTestCase(String name) {
+ super(name);
+ }
+
+ /**
+ * Tests chaining of some processors, and execution of the processors
+ * on some documents
+ */
+ public void testFailingProcessing() {
+ // Set up service programmatically
+ DocprocService service = new DocprocService("failing-no-exception");
+ DocumentProcessor first = new SettingValueProcessor("done 1");
+ DocumentProcessor second = new FailingProcessor("done 2");
+ DocumentProcessor third = new SettingValueProcessor("done 3");
+ service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third));
+ service.setInService(true);
+
+ assertProcessingWorks(service);
+ }
+
+ protected void assertProcessingWorks(DocprocService service) {
+ // Create documents
+ DocumentType type = new DocumentType("test");
+ type.addField("test", DataType.STRING);
+ DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:woexception:test:1"));
+ DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:woexception:test:2"));
+ DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:woexception:test:3"));
+
+ // Process them
+ service.process(put1);
+ service.process(put2);
+ service.process(put3);
+ while (service.doWork()) {}
+
+ // Verify
+ assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test"));
+ assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to PROCESSING_FAILED in 2
+ assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test"));
+ }
+
+ public static class SettingValueProcessor extends DocumentProcessor {
+
+ private String value;
+
+ public SettingValueProcessor(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ ((DocumentPut)op).getDocument().setFieldValue("test", value);
+ }
+ return Progress.DONE;
+ }
+ }
+
+ public static class FailingProcessor extends SettingValueProcessor {
+
+ public FailingProcessor(String name) {
+ super(name);
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ super.process(processing);
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op.getId().toString().equals("doc:woexception:test:2")) {
+ return DocumentProcessor.Progress.FAILED;
+ }
+ }
+ return DocumentProcessor.Progress.DONE;
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java
new file mode 100644
index 00000000000..0e84ba04647
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/FailingPermanentlyDocumentProcessingTestCase.java
@@ -0,0 +1,101 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.datatypes.StringFieldValue;
+
+/**
+ * Tests a document processing where some processings fail permanently
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M. R. Rosenvinge</a>
+ */
+public class FailingPermanentlyDocumentProcessingTestCase extends junit.framework.TestCase {
+
+ public FailingPermanentlyDocumentProcessingTestCase(String name) {
+ super(name);
+ }
+
+ /**
+ * Tests chaining of some processors, and execution of the processors
+ * on some documents
+ */
+ public void testFailingProcessing() {
+ // Set up service programmatically
+ DocprocService service = new DocprocService("failing-permanently");
+ DocumentProcessor first = new SettingValueProcessor("done 1");
+ DocumentProcessor second = new FailingProcessor("done 2");
+ DocumentProcessor third = new SettingValueProcessor("done 3");
+ service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third));
+ service.setInService(true);
+
+ assertProcessingWorks(service);
+ }
+
+ protected void assertProcessingWorks(DocprocService service) {
+ // Create documents
+ DocumentType type = new DocumentType("test");
+ type.addField("test", DataType.STRING);
+ DocumentPut put1 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:1"));
+ DocumentPut put2 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:2"));
+ DocumentPut put3 = new DocumentPut(type, new DocumentId("doc:permanentfailure:test:3"));
+
+ // Process them
+ service.process(put1);
+ service.process(put2);
+ service.process(put3);
+ while (service.doWork()) {}
+
+ // Verify
+ assertEquals(new StringFieldValue("done 3"), put1.getDocument().getFieldValue("test"));
+ assertEquals(new StringFieldValue("done 2"), put2.getDocument().getFieldValue("test")); // Due to PERMANENT_FAILURE in 2
+ assertNull(put3.getDocument().getFieldValue("test")); //service is disabled now
+ assertFalse(service.isInService());
+
+ service.setInService(true);
+ while (service.doWork()) {}
+
+ assertEquals(new StringFieldValue("done 3"), put3.getDocument().getFieldValue("test"));
+ assertTrue(service.isInService());
+ }
+
+ public static class SettingValueProcessor extends DocumentProcessor {
+
+ private String value;
+
+ public SettingValueProcessor(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ ((DocumentPut)op).getDocument().setFieldValue("test", value);
+ }
+ return Progress.DONE;
+ }
+ }
+
+ public static class FailingProcessor extends SettingValueProcessor {
+
+ public FailingProcessor(String name) {
+ super(name);
+ }
+
+ @Override
+ public Progress process(Processing processing) {
+ super.process(processing);
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op.getId().toString().equals("doc:permanentfailure:test:2")) {
+ return DocumentProcessor.Progress.PERMANENT_FAILURE;
+ }
+ }
+ return DocumentProcessor.Progress.DONE;
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java
new file mode 100644
index 00000000000..184b0aa9e88
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/FailingWithErrorTestCase.java
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class FailingWithErrorTestCase extends junit.framework.TestCase {
+
+ public void testErrors() {
+ DocprocService service = new DocprocService("failing");
+ DocumentProcessor first = new ErrorThrowingProcessor();
+ service.setCallStack(new CallStack().addLast(first));
+ service.setInService(true);
+
+ DocumentType type = new DocumentType("test");
+ type.addField("test", DataType.STRING);
+ DocumentPut put = new DocumentPut(type, new DocumentId("doc:failing:test:1"));
+ put.getDocument().setFieldValue("test", "foobar");
+
+ service.process(put);
+ assertEquals(1, service.getQueueSize());
+ try {
+ while (service.doWork()) { }
+ fail("Should have gotten OOME here");
+ } catch (Throwable t) {
+ //we don't want a finally block in doWork()!
+ assertEquals(0, service.getQueueSize());
+ }
+ assertEquals(0, service.getQueueSize());
+
+ }
+
+ private class ErrorThrowingProcessor extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ throw new OutOfMemoryError("Einar is out of mem");
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java b/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java
new file mode 100644
index 00000000000..660daec67d8
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/IncrementingDocumentProcessor.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DocumentPut;
+
+/**
+ * Document processor used for a simple, silly test.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class IncrementingDocumentProcessor extends SimpleDocumentProcessor {
+ int counter = 0;
+
+ @Override
+ public void process(DocumentPut put) {
+ System.err.println(counter + " DocumentPut: " + put);
+ counter++;
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java
new file mode 100644
index 00000000000..9e5f7a43c74
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/NotAcceptingNewProcessingsTestCase.java
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class NotAcceptingNewProcessingsTestCase extends junit.framework.TestCase {
+
+ public void testNotAccepting() {
+ DocprocService service = new DocprocService("habla");
+ service.setCallStack(new CallStack());
+ service.setInService(true);
+
+ service.process(new Processing());
+ assertEquals(1, service.getQueueSize());
+
+ service.setAcceptingNewProcessings(false);
+
+ try {
+ service.process(new Processing());
+ fail("Should have gotten IllegalStateException here");
+ } catch (IllegalStateException ise) {
+ //ok!
+ }
+ assertEquals(1, service.getQueueSize());
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java
new file mode 100644
index 00000000000..296c8d163e5
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/ProcessingTestCase.java
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DocumentOperation;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ * @since 5.1.10
+ */
+public class ProcessingTestCase {
+
+ @Test
+ public void serviceName() {
+ assertThat(new Processing().getServiceName(), is("default"));
+ assertThat(new Processing("foobar", (DocumentOperation) null, null).getServiceName(), is("foobar"));
+ }
+
+ @Test
+ public void contextVariables() {
+ Processing p = new Processing();
+
+ p.setVariable("foo", "banana");
+ p.setVariable("bar", "apple");
+
+ Iterator<Map.Entry<String, Object>> it = p.getVariableAndNameIterator();
+ assertThat(it.hasNext(), is(true));
+ assertThat(it.next(), not(nullValue()));
+ assertThat(it.hasNext(), is(true));
+ assertThat(it.next(), not(nullValue()));
+ assertThat(it.hasNext(), is(false));
+
+ assertThat(p.hasVariable("foo"), is(true));
+ assertThat(p.hasVariable("bar"), is(true));
+ assertThat(p.hasVariable("baz"), is(false));
+
+ assertThat(p.removeVariable("foo"), is("banana"));
+
+ p.clearVariables();
+
+ it = p.getVariableAndNameIterator();
+ assertThat(it.hasNext(), is(false));
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java
new file mode 100644
index 00000000000..d8398f1fe47
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/ProcessingUpdateTestCase.java
@@ -0,0 +1,103 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.Field;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.update.AssignValueUpdate;
+import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.document.update.ValueUpdate;
+
+import java.util.List;
+import java.util.StringTokenizer;
+
+/**
+ * Simple test case for testing that processing of both documents and
+ * document updates works.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class ProcessingUpdateTestCase extends junit.framework.TestCase {
+
+ private DocumentPut put;
+ private DocumentUpdate update;
+
+ private DocumentTypeManager dtm;
+
+ public void testProcessingUpdates() {
+ DocumentType articleType = new DocumentType("article");
+ articleType.addField(new Field("body", DataType.STRING, true));
+ articleType.addField(new Field("title", DataType.STRING, true));
+ dtm = new DocumentTypeManager();
+ dtm.registerDocumentType(articleType);
+
+ put = new DocumentPut(articleType, "doc:banana:apple");
+ put.getDocument().setFieldValue("body", "this is the body of the article, blah blah blah");
+ FieldUpdate upd = FieldUpdate.createAssign(articleType.getField("body"), new StringFieldValue("this is the updated body of the article, blahdi blahdi blahdi"));
+ update = new DocumentUpdate(articleType, new DocumentId("doc:grape:orange"));
+ update.addFieldUpdate(upd);
+
+ DocprocService service = new DocprocService("update");
+ DocumentProcessor firstP = new TitleDocumentProcessor();
+ service.setCallStack(new CallStack().addLast(firstP));
+ service.setInService(true);
+
+
+
+ Processing p = new Processing();
+ p.addDocumentOperation(put);
+ p.addDocumentOperation(update);
+
+ service.process(p);
+
+ while (service.doWork()) { }
+
+ List<DocumentOperation> operations = p.getDocumentOperations();
+ Document first = ((DocumentPut)operations.get(0)).getDocument();
+ assertEquals(new StringFieldValue("this is the body of the article, blah blah blah"), first.getFieldValue("body"));
+ assertEquals(new StringFieldValue("body blah blah blah "), first.getFieldValue("title"));
+
+ DocumentUpdate second = (DocumentUpdate) operations.get(1);
+ FieldUpdate firstUpd = second.getFieldUpdate(0);
+ assertEquals(ValueUpdate.ValueUpdateClassID.ASSIGN, firstUpd.getValueUpdate(0).getValueUpdateClassID());
+ assertEquals(new StringFieldValue("this is the updated body of the article, blahdi blahdi blahdi"), firstUpd.getValueUpdate(0)
+ .getValue());
+
+ FieldUpdate secondUpd = second.getFieldUpdate(1);
+ assertEquals(ValueUpdate.ValueUpdateClassID.ASSIGN, secondUpd.getValueUpdate(0).getValueUpdateClassID());
+ assertEquals(new StringFieldValue("body blahdi blahdi blahdi "), secondUpd.getValueUpdate(0).getValue());
+ }
+
+ private class TitleDocumentProcessor extends SimpleDocumentProcessor {
+ @Override
+ public void process(DocumentPut doc) {
+ put.getDocument().setFieldValue("title", extractTitle(put.getDocument().getFieldValue("body").toString()));
+ }
+
+ @Override
+ public void process(DocumentUpdate upd) {
+ FieldUpdate bodyFieldUpdate = upd.getFieldUpdate("body");
+ AssignValueUpdate au = (AssignValueUpdate) bodyFieldUpdate.getValueUpdate(0);
+ FieldUpdate titleUpd = FieldUpdate.createAssign(upd.getType().getField("title"), new StringFieldValue(extractTitle(((StringFieldValue) au.getValue()).getString())));
+ upd.addFieldUpdate(titleUpd);
+ }
+
+ private String extractTitle(String body) {
+ if (body == null) return null;
+ StringTokenizer strTok = new StringTokenizer(body, " ");
+ String title = "";
+ while (strTok.hasMoreTokens()) {
+ String word = strTok.nextToken();
+ if (word.startsWith("b")) title += word + " ";
+ }
+ return title;
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java
new file mode 100644
index 00000000000..8ca553b9290
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessingTestCase.java
@@ -0,0 +1,62 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.docproc.Accesses.Field.Tree;
+
+/**
+ * Tests the basic operation of the docproc service
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ */
+public class SimpleDocumentProcessingTestCase extends DocumentProcessingAbstractTestCase {
+
+ public SimpleDocumentProcessingTestCase(String name) {
+ super(name);
+ }
+
+ /**
+ * Tests chaining of some processors, and execution of the processors
+ * on some documents
+ */
+ public void testSimpleProcessing() {
+ // Set up service programmatically
+ DocprocService service = new DocprocService("simple");
+ DocumentProcessor first = new TestDocumentProcessor1();
+ DocumentProcessor second = new TestDocumentProcessor2();
+ DocumentProcessor third = new TestDocumentProcessor3();
+ service.setCallStack(new CallStack().addLast(first).addLast(second).addLast(third));
+ service.setInService(true);
+
+ assertProcessingWorks(service);
+ }
+
+ public void testAnnotationBasic() {
+ Accesses accesses = MyDocProc.class.getAnnotation(Accesses.class);
+ After after = MyDocProc.class.getAnnotation(After.class);
+ assertNotNull(accesses);
+ assertNotNull(after);
+ assertEquals(after.value()[0], "MyOtherDocProc");
+ assertEquals(after.value()[1], "AnotherDocProc");
+ assertEquals(accesses.value()[0].name(), "myField1");
+ assertEquals(accesses.value()[1].annotations()[0].consumes()[0], "word");
+ assertEquals(accesses.value()[1].annotations()[0].consumes()[1], "phrase");
+ }
+
+ @Accesses({ @Accesses.Field(name = "myField1", dataType = "string",
+ description = "What is done on field myField1",
+ annotations = @Tree(produces = { "word", "sentence" }, consumes = "word")),
+ @Accesses.Field(name = "myField2", dataType = "string",
+ description = "What is done on field myField2",
+ annotations = @Tree(consumes = { "word", "phrase" })) })
+ @After({ "MyOtherDocProc", "AnotherDocProc" })
+ public static class MyDocProc extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
+}
+
diff --git a/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java
new file mode 100644
index 00000000000..aae98dd22e7
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/SimpleDocumentProcessorTestCase.java
@@ -0,0 +1,152 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.container.StatisticsConfig;
+import com.yahoo.docproc.jdisc.metric.NullMetric;
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentRemove;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.idstring.UserDocIdString;
+import com.yahoo.statistics.StatisticsImpl;
+import org.junit.Test;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNull.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class SimpleDocumentProcessorTestCase {
+
+ private static DocprocService setupDocprocService(SimpleDocumentProcessor processor) {
+ CallStack stack = new CallStack("default", new StatisticsImpl(new StatisticsConfig(new StatisticsConfig.Builder())), new NullMetric());
+ stack.addLast(processor);
+ DocprocService service = new DocprocService("default");
+ service.setCallStack(stack);
+ service.setInService(true);
+ return service;
+ }
+
+ private static Processing getProcessing(DocumentOperation... operations) {
+ Processing processing = new Processing();
+
+ for (DocumentOperation op : operations) {
+ processing.addDocumentOperation(op);
+ }
+
+ return processing;
+ }
+
+ @Test
+ public void requireThatProcessingMultipleOperationsWork() {
+ DocumentType type = new DocumentType("foobar");
+ type.addField("title", DataType.STRING);
+
+ Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document"),
+ new DocumentUpdate(type, "doc:this:is:an:update"),
+ new DocumentRemove(new DocumentId("doc:this:is:a:remove")));
+
+ DocprocService service = setupDocprocService(new VerySimpleDocumentProcessor());
+ service.getExecutor().process(p);
+
+ assertThat(p.getDocumentOperations().size(), is(3));
+ assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true));
+ assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(),
+ is("processed"));
+ assertThat(p.getDocumentOperations().get(1) instanceof DocumentUpdate, is(true));
+ assertThat(p.getDocumentOperations().get(2) instanceof DocumentRemove, is(true));
+ assertThat(p.getDocumentOperations().get(2).getId().toString(),
+ is("userdoc:foobar:1234:something"));
+ }
+
+ @Test
+ public void requireThatProcessingSingleOperationWorks() {
+ DocumentType type = new DocumentType("foobar");
+ type.addField("title", DataType.STRING);
+
+ Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document"));
+ DocprocService service = setupDocprocService(new VerySimpleDocumentProcessor());
+ service.getExecutor().process(p);
+
+ assertThat(p.getDocumentOperations().size(), is(1));
+ assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true));
+ assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(),
+ is("processed"));
+ }
+
+ @Test
+ public void requireThatThrowingTerminatesIteration() {
+ DocumentType type = new DocumentType("foobar");
+ type.addField("title", DataType.STRING);
+
+ Processing p = getProcessing(new DocumentPut(type, "doc:this:is:a:document"),
+ new DocumentRemove(new DocumentId("doc:this:is:a:remove")),
+ new DocumentPut(type, "doc:this:is:a:document2"));
+
+ DocprocService service = setupDocprocService(new SimpleDocumentProcessorThrowingOnRemovesAndUpdates());
+ try {
+ service.getExecutor().process(p);
+ } catch (RuntimeException re) {
+ //ok
+ }
+
+ assertThat(p.getDocumentOperations().size(), is(3));
+ assertThat(p.getDocumentOperations().get(0) instanceof DocumentPut, is(true));
+ assertThat(((DocumentPut) p.getDocumentOperations().get(0)).getDocument().getFieldValue("title").getWrappedValue(),
+ is("processed"));
+ assertThat(p.getDocumentOperations().get(1) instanceof DocumentRemove, is(true));
+ assertThat(p.getDocumentOperations().get(1).getId().toString(),
+ is("doc:this:is:a:remove"));
+ assertThat(p.getDocumentOperations().get(2) instanceof DocumentPut, is(true));
+ assertThat(((DocumentPut) p.getDocumentOperations().get(2)).getDocument().getFieldValue("title"),
+ nullValue());
+
+
+ }
+
+ public static class VerySimpleDocumentProcessor extends SimpleDocumentProcessor {
+
+ @Override
+ public void process(DocumentPut put) {
+ put.getDocument().setFieldValue("title", new StringFieldValue("processed"));
+ }
+
+ @Override
+ public void process(DocumentRemove remove) {
+ remove.getId().setId(new UserDocIdString("foobar", 1234L, "something"));
+ }
+
+ @Override
+ public void process(DocumentUpdate update) {
+ update.clearFieldUpdates();
+ }
+
+ }
+
+ public static class SimpleDocumentProcessorThrowingOnRemovesAndUpdates extends SimpleDocumentProcessor {
+
+ @Override
+ public void process(DocumentPut put) {
+ put.getDocument().setFieldValue("title", new StringFieldValue("processed"));
+ }
+
+ @Override
+ public void process(DocumentRemove remove) {
+ throw new RuntimeException("oh no.");
+ }
+
+ @Override
+ public void process(DocumentUpdate update) {
+ throw new RuntimeException("oh no.");
+ }
+
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java
new file mode 100644
index 00000000000..076a8610c4a
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/TransientFailureTestCase.java
@@ -0,0 +1,97 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc;
+
+import com.yahoo.document.DataType;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentOperation;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class TransientFailureTestCase extends junit.framework.TestCase {
+ DocumentType type;
+
+ public void setUp() {
+ type = new DocumentType("test");
+ type.addField("boo", DataType.STRING);
+ }
+
+ public void testTransientFailures() {
+ DocprocService service = new DocprocService("transfail");
+ CallStack stack = new CallStack();
+ stack.addNext(new OkDocProc()).addNext(new TransientFailDocProc());
+ service.setCallStack(stack);
+ service.setInService(true);
+
+ EndpointSupportingTransientFailures endpoint = new EndpointSupportingTransientFailures();
+
+ DocumentPut put;
+
+ put = new DocumentPut(type, new DocumentId("doc:transfail:bad"));
+ service.process(put, endpoint);
+ while (service.doWork()) { }
+ assertEquals(0, endpoint.numOk);
+ assertEquals(1, endpoint.numTransientFail);
+ assertEquals(0, endpoint.numFail);
+
+ put = new DocumentPut(type, new DocumentId("doc:transfail:verybad"));
+ service.process(put, endpoint);
+ while (service.doWork()) { }
+ assertEquals(0, endpoint.numOk);
+ assertEquals(1, endpoint.numTransientFail);
+ assertEquals(1, endpoint.numFail);
+
+ put = new DocumentPut(type, new DocumentId("doc:transfail:good"));
+ service.process(put, endpoint);
+ while (service.doWork()) { }
+ assertEquals(1, endpoint.numOk);
+ assertEquals(1, endpoint.numTransientFail);
+ assertEquals(1, endpoint.numFail);
+
+ put = new DocumentPut(type, new DocumentId("doc:transfail:veryverybad"));
+ service.process(put, endpoint);
+ while (service.doWork()) { }
+ assertEquals(1, endpoint.numOk);
+ assertEquals(1, endpoint.numTransientFail);
+ assertEquals(2, endpoint.numFail);
+ }
+
+ private static class OkDocProc extends SimpleDocumentProcessor {
+ }
+
+ private static class TransientFailDocProc extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op.getId().toString().equals("doc:transfail:bad")) {
+ throw new TransientFailureException("sorry, try again later");
+ } else if (op.getId().toString().equals("doc:transfail:verybad")) {
+ return Progress.FAILED;
+ } else if (op.getId().toString().equals("doc:transfail:veryverybad")) {
+ return Progress.PERMANENT_FAILURE;
+ }
+ }
+ return Progress.DONE;
+ }
+ }
+
+ private static class EndpointSupportingTransientFailures implements ProcessingEndpoint {
+ private volatile int numOk = 0;
+ private volatile int numTransientFail = 0;
+ private volatile int numFail = 0;
+
+ public void processingDone(Processing processing) {
+ numOk++;
+ }
+
+ public void processingFailed(Processing processing, Exception exception) {
+ if (exception instanceof TransientFailureException) {
+ numTransientFail++;
+ } else {
+ numFail++;
+ }
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java
new file mode 100644
index 00000000000..f928ec4febd
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocprocThreadPoolExecutorTestCase.java
@@ -0,0 +1,83 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocprocThreadPoolExecutorTestCase {
+ private final Set<Long> threadIds = Collections.synchronizedSet(new HashSet<Long>());
+
+ @Test
+ public void threadPool() throws InterruptedException {
+ int numThreads = 8;
+ int numTasks = 200;
+
+ LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<>();
+ DocprocThreadManager mgr = new DocprocThreadManager(1000l);
+ DocprocThreadPoolExecutor pool = new DocprocThreadPoolExecutor(numThreads, q, mgr);
+
+ List<MockedDocumentProcessingTask> tasks = new ArrayList<>(numTasks);
+ for (int i = 0; i < numTasks; i++) {
+ tasks.add(new MockedDocumentProcessingTask());
+ }
+
+ for (int i = 0; i < numTasks; i++) {
+ pool.execute(tasks.get(i));
+ }
+ pool.shutdown();
+ pool.awaitTermination(120L, TimeUnit.SECONDS);
+
+ for (int i = 0; i < numTasks; i++) {
+ assertTrue(tasks.get(i).hasBeenRun());
+ }
+
+ System.err.println(threadIds);
+ assertEquals(numThreads, threadIds.size());
+ }
+
+ private class MockedDocumentProcessingTask extends DocumentProcessingTask {
+ private boolean hasBeenRun = false;
+
+ public MockedDocumentProcessingTask() {
+ super(null, null, null);
+ }
+
+ @Override
+ public void run() {
+ threadIds.add(Thread.currentThread().getId());
+ System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " running task " + this);
+ for (int i = 0; i < 100000; i++) {
+ Math.sin((double) (System.currentTimeMillis() / 10000L));
+ }
+ System.err.println(System.currentTimeMillis() + " MOCK Thread " + Thread.currentThread().getId() + " DONE task " + this);
+ hasBeenRun = true;
+ }
+
+ @Override
+ public int getApproxSize() {
+ return 333;
+ }
+
+ @Override
+ public String toString() {
+ return "seqNum " + getSeqNum();
+ }
+
+ public boolean hasBeenRun() {
+ return hasBeenRun;
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java
new file mode 100644
index 00000000000..271ad09ab1d
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerAllMessageTypesTestCase.java
@@ -0,0 +1,230 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.*;
+import com.yahoo.document.datatypes.FieldValue;
+import com.yahoo.document.datatypes.IntegerFieldValue;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.documentapi.messagebus.protocol.BatchDocumentUpdateMessage;
+import com.yahoo.documentapi.messagebus.protocol.GetDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.vdslib.DocumentList;
+import com.yahoo.vdslib.Entry;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingHandlerAllMessageTypesTestCase extends DocumentProcessingHandlerTestBase {
+
+ private static final String FOOBAR = "foobar";
+ private final DocumentType type;
+
+ public DocumentProcessingHandlerAllMessageTypesTestCase() {
+ this.type = new DocumentType("baz");
+ this.type.addField(new Field("blahblah", DataType.STRING));
+ this.type.addField(new Field("defaultWait", DataType.INT));
+ this.type.addField(new Field("customWait", DataType.INT));
+ }
+
+ @Test
+ public void testMessages() throws InterruptedException {
+ get();
+ put();
+ remove();
+ update();
+ batchDocumentUpdate();
+ }
+
+ private void get() throws InterruptedException {
+ GetDocumentMessage message = new GetDocumentMessage(new DocumentId("doc:this:is:a:test"), "fieldset?");
+
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(result);
+ remoteServer.ackMessage(result);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+
+ assertThat(result, instanceOf(GetDocumentMessage.class));
+
+ assertFalse(reply.hasErrors());
+ }
+
+
+ private void put() throws InterruptedException {
+ Document document = new Document(getType(), "doc:baz:foo");
+ document.setFieldValue("blahblah", new StringFieldValue("This is a test."));
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(document));
+
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(result);
+ remoteServer.ackMessage(result);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+
+ assertThat(result, instanceOf(PutDocumentMessage.class));
+ PutDocumentMessage outputMsg = (PutDocumentMessage) result;
+ assertThat(((StringFieldValue) outputMsg.getDocumentPut().getDocument().getFieldValue("blahblah")).getString(), is("THIS IS A TEST."));
+
+ assertFalse(reply.hasErrors());
+ }
+
+ private void remove() throws InterruptedException {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:12345:6789"));
+
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(result);
+ remoteServer.ackMessage(result);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+
+ assertThat(result, instanceOf(RemoveDocumentMessage.class));
+ RemoveDocumentMessage outputMsg = (RemoveDocumentMessage) result;
+ assertThat(outputMsg.getDocumentId().toString(), is("doc:12345:6789"));
+
+ assertFalse(reply.hasErrors());
+ }
+
+ private void update() throws InterruptedException {
+ DocumentUpdate documentUpdate = new DocumentUpdate(getType(), "doc:baz:foo");
+ UpdateDocumentMessage message = new UpdateDocumentMessage(documentUpdate);
+
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(result);
+ remoteServer.ackMessage(result);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+
+ assertThat(result, instanceOf(UpdateDocumentMessage.class));
+ UpdateDocumentMessage outputMsg = (UpdateDocumentMessage) result;
+ assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:baz:foo"));
+
+ assertFalse(reply.hasErrors());
+ }
+
+ private void batchDocumentUpdate() throws InterruptedException {
+ DocumentUpdate doc1 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:multi:1"));
+ DocumentUpdate doc2 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:multi:2"));
+
+ Field testField = getType().getField("blahblah");
+ doc1.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("1 not yet processed")));
+ doc2.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("2 not yet processed")));
+
+ BatchDocumentUpdateMessage message = new BatchDocumentUpdateMessage(12345);
+ message.addUpdate(doc1);
+ message.addUpdate(doc2);
+
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message remote1 = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertTrue(remote1 instanceof UpdateDocumentMessage);
+ remoteServer.ackMessage(remote1);
+ assertNull(driver.client().awaitReply(100, TimeUnit.MILLISECONDS));
+
+ Message remote2 = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertTrue(remote2 instanceof UpdateDocumentMessage);
+ remoteServer.ackMessage(remote2);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ }
+
+ @Override
+ public List<Pair<String,CallStack>> getCallStacks() {
+ CallStack stack = new CallStack();
+ stack.addLast(new YallaDocumentProcessor());
+ stack.addLast(new WaitingDefaultDocumentProcessor());
+ stack.addLast(new WaitingCustomDocumentProcessor());
+
+ ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1);
+ stacks.add(new Pair<>(FOOBAR, stack));
+ return stacks;
+ }
+
+ @Override
+ public DocumentType getType() {
+ return type;
+ }
+
+ private static class YallaDocumentProcessor extends DocumentProcessor {
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op instanceof DocumentPut) {
+ Document doc = ((DocumentPut) op).getDocument();
+ for (Field f : doc.getDataType().fieldSet()) {
+ FieldValue val = doc.getFieldValue(f);
+ if (val instanceof StringFieldValue) {
+ StringFieldValue sf = (StringFieldValue) val;
+ doc.setFieldValue(f, new StringFieldValue(sf.getString().toUpperCase()));
+ }
+ }
+ }
+ //don't touch updates or removes
+ }
+ return Progress.DONE;
+ }
+ }
+
+ private static abstract class WaitingDocumentProcessor extends DocumentProcessor {
+ protected Progress laterProgress;
+ protected String waitKey;
+
+ @Override
+ public Progress process(Processing processing) {
+ for (DocumentOperation op : processing.getDocumentOperations()) {
+ if (op instanceof DocumentPut) {
+ Document doc = ((DocumentPut) op).getDocument();
+ if (doc.getFieldValue(waitKey) == null) {
+ System.out.println(this.getClass().getSimpleName() + ": returning LATER for " + doc);
+ doc.setFieldValue(waitKey, new IntegerFieldValue(1));
+ return laterProgress;
+ }
+ }
+ //don't touch updates or removes
+ }
+ return Progress.DONE;
+ }
+ }
+
+ private static class WaitingDefaultDocumentProcessor extends WaitingDocumentProcessor {
+ private WaitingDefaultDocumentProcessor() {
+ laterProgress = Progress.LATER;
+ waitKey = "defaultWait";
+ }
+ }
+
+ private static class WaitingCustomDocumentProcessor extends WaitingDocumentProcessor {
+ private WaitingCustomDocumentProcessor() {
+ laterProgress = Progress.later(60);
+ waitKey = "customWait";
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java
new file mode 100644
index 00000000000..d308d0a484a
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerBasicTestCase.java
@@ -0,0 +1,79 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.SimpleDocumentProcessor;
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.Field;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingHandlerBasicTestCase extends DocumentProcessingHandlerTestBase {
+ private DocumentType type;
+
+ public DocumentProcessingHandlerBasicTestCase() {
+ this.type = new DocumentType("yalla");
+ this.type.addField(new Field("blahblah", DataType.STRING));
+ }
+
+ @Test
+ public void testPut() throws InterruptedException {
+ Document document = new Document(getType(), "doc:yalla:balla");
+ document.setFieldValue("blahblah", new StringFieldValue("This is a test."));
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(document));
+
+ assertTrue(sendMessage("foobar", message));
+
+ Message msg = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(msg);
+ remoteServer.ackMessage(msg);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+
+ assertThat((msg instanceof PutDocumentMessage), is(true));
+ PutDocumentMessage put = (PutDocumentMessage) msg;
+ Document outDoc = put.getDocumentPut().getDocument();
+
+ assertThat(document, equalTo(outDoc));
+ assertFalse(reply.hasErrors());
+ }
+
+ @Override
+ public List<Pair<String,CallStack>> getCallStacks() {
+ CallStack stack = new CallStack();
+ stack.addLast(new TestDocumentProcessor());
+
+ ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1);
+ stacks.add(new Pair<>("foobar", stack));
+ return stacks;
+ }
+
+ @Override
+ public DocumentType getType() {
+ return type;
+ }
+
+ public static class TestDocumentProcessor extends SimpleDocumentProcessor {
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java
new file mode 100644
index 00000000000..b637f35af4f
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerForkTestCase.java
@@ -0,0 +1,215 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.*;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.WriteDocumentReply;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.vdslib.DocumentList;
+import com.yahoo.vdslib.Entry;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingHandlerForkTestCase extends DocumentProcessingHandlerTestBase {
+
+ private static final String TOMANYALLINSAMEBUCKET = "tomanyallinsamebucket";
+ private static final String TOMANYSOMEINSAMEBUCKET = "tomanysomeinsamebucket";
+ private static final String TOMANY = "many";
+ private static final String TOONE = "toone";
+ private static final String TOZERO = "tozero";
+ private final DocumentType type;
+
+ public DocumentProcessingHandlerForkTestCase() {
+ this.type = new DocumentType("baz");
+ this.type.addField(new Field("blahblah", DataType.STRING));
+ }
+
+ @Override
+ public DocumentType getType() {
+ return type;
+ }
+
+ @Test
+ public void testMessages() throws InterruptedException {
+ putToFourPuts();
+ putToManyAllInSameBucket();
+ putToManySomeInSameBucket();
+ putToOne();
+ putToZero();
+ }
+
+ private void putToManyAllInSameBucket() throws InterruptedException {
+ assertPutMessages(createPutDocumentMessage(), TOMANYALLINSAMEBUCKET,
+ "userdoc:123456:11111:foo:er:bra",
+ "userdoc:123456:11111:foo:trallala",
+ "userdoc:123456:11111:foo:a");
+ }
+
+ private void putToManySomeInSameBucket() throws InterruptedException {
+ assertPutMessages(createPutDocumentMessage(), TOMANYSOMEINSAMEBUCKET,
+ "userdoc:123456:7890:bar:er:bra",
+ "doc:foo:bar:er:ja",
+ "userdoc:567890:1234:a",
+ "doc:foo:bar:hahahhaa",
+ "userdoc:123456:7890:a:a",
+ "doc:foo:bar:aa",
+ "userdoc:567890:1234:bar:ala",
+ "doc:foo:bar:sdfgsaa",
+ "userdoc:123456:7890:bar:tralsfa",
+ "doc:foo:bar:dfshaa");
+ }
+
+ private void putToFourPuts() throws InterruptedException {
+ assertPutMessages(createPutDocumentMessage(), TOMANY,
+ "doc:foo:bar:er:bra",
+ "doc:foo:bar:er:ja",
+ "doc:foo:bar:hahahhaa",
+ "doc:foo:bar:trallala");
+ }
+
+ private void putToOne() throws InterruptedException {
+ assertPutMessages(createPutDocumentMessage(), TOONE,
+ "doc:baz:bar");
+ }
+
+ private void putToZero() throws InterruptedException {
+ assertTrue(sendMessage(TOZERO, createPutDocumentMessage()));
+
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertTrue(reply instanceof WriteDocumentReply);
+ assertFalse(reply.hasErrors());
+ }
+
+ @Override
+ protected List<Pair<String, CallStack>> getCallStacks() {
+ ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(5);
+ stacks.add(new Pair<>(TOMANYALLINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsAllInSameBucketProcessor())));
+ stacks.add(new Pair<>(TOMANYSOMEINSAMEBUCKET, new CallStack().addLast(new OneToManyDocumentsSomeInSameBucketProcessor())));
+ stacks.add(new Pair<>(TOMANY, new CallStack().addLast(new OneToManyDocumentsProcessor())));
+ stacks.add(new Pair<>(TOONE, new CallStack().addLast(new OneToOneDocumentsProcessor())));
+ stacks.add(new Pair<>(TOZERO, new CallStack().addLast(new OneToZeroDocumentsProcessor())));
+ return stacks;
+ }
+
+ protected PutDocumentMessage createPutDocumentMessage() {
+ Document document = new Document(getType(), "doc:baz:bar");
+ document.setFieldValue("blahblah", new StringFieldValue("This is a test."));
+ return new PutDocumentMessage(new DocumentPut(document));
+ }
+
+ private void assertPutMessages(DocumentMessage msg, String route, String... expected) throws InterruptedException {
+ msg.getTrace().setLevel(9);
+ assertTrue(sendMessage(route, msg));
+
+ String[] actual = new String[expected.length];
+ for (int i = 0; i < expected.length; ++i) {
+ Message remoteMsg = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertTrue(remoteMsg instanceof PutDocumentMessage);
+ remoteMsg.getTrace().trace(1, "remoteServer.ack(" + expected[i] + ")");
+ remoteServer.ackMessage(remoteMsg);
+ actual[i] = ((PutDocumentMessage)remoteMsg).getDocumentPut().getDocument().getId().toString();
+ }
+ assertNull(remoteServer.awaitMessage(100, TimeUnit.MILLISECONDS));
+
+ Arrays.sort(expected);
+ Arrays.sort(actual);
+ assertArrayEquals(expected, actual);
+
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+ String trace = reply.getTrace().toString();
+ for (String documentId : expected) {
+ assertTrue("missing trace for document '" + documentId + "'\n" + trace,
+ trace.contains("remoteServer.ack(" + documentId + ")"));
+ }
+ if (expected.length == 1) {
+ assertFalse("unexpected fork in trace for single document\n" + trace,
+ trace.contains("<fork>"));
+ } else {
+ assertTrue("missing fork in trace for " + expected.length + " split\n" + trace,
+ trace.contains("<fork>"));
+ }
+ }
+
+ public class OneToOneDocumentsProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ return Progress.DONE;
+ }
+ }
+
+ public class OneToManyDocumentsProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ List<DocumentOperation> operations = processing.getDocumentOperations();
+ operations.clear();
+ operations.add(new DocumentPut(type, "doc:foo:bar:er:bra"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:er:ja"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:trallala"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:hahahhaa"));
+ return Progress.DONE;
+ }
+ }
+
+ public class OneToZeroDocumentsProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ processing.getDocumentOperations().clear();
+ return Progress.DONE;
+ }
+ }
+
+ public class OneToManyDocumentsSomeInSameBucketProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ List<DocumentOperation> operations = processing.getDocumentOperations();
+ operations.clear();
+ operations.add(new DocumentPut(type, "userdoc:123456:7890:bar:er:bra"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:er:ja"));
+ operations.add(new DocumentPut(type, "userdoc:567890:1234:a"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:hahahhaa"));
+ operations.add(new DocumentPut(type, "userdoc:123456:7890:a:a"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:aa"));
+ operations.add(new DocumentPut(type, "userdoc:567890:1234:bar:ala"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:sdfgsaa"));
+ operations.add(new DocumentPut(type, "userdoc:123456:7890:bar:tralsfa"));
+ operations.add(new DocumentPut(type, "doc:foo:bar:dfshaa"));
+ return Progress.DONE;
+ }
+
+ }
+
+ public class OneToManyDocumentsAllInSameBucketProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ List<DocumentOperation> docs = processing.getDocumentOperations();
+ docs.clear();
+ docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:er:bra"));
+ docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:trallala"));
+ docs.add(new DocumentPut(type, "userdoc:123456:11111:foo:a"));
+ return Progress.DONE;
+ }
+
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
new file mode 100644
index 00000000000..4af0b148164
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTestBase.java
@@ -0,0 +1,131 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.container.core.document.ContainerDocumentConfig;
+import com.yahoo.container.jdisc.messagebus.MbusServerProvider;
+import com.yahoo.container.jdisc.messagebus.SessionCache;
+import com.yahoo.docproc.AbstractConcreteDocumentFactory;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocprocService;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.jdisc.messagebus.MbusRequestContext;
+
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.loadtypes.LoadType;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jdisc.AbstractResource;
+import com.yahoo.jdisc.ReferencedResource;
+import com.yahoo.jdisc.application.ContainerBuilder;
+import com.yahoo.messagebus.Protocol;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.jdisc.MbusClient;
+import com.yahoo.messagebus.jdisc.test.RemoteServer;
+import com.yahoo.messagebus.jdisc.test.ServerTestDriver;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.messagebus.shared.SharedSourceSession;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public abstract class DocumentProcessingHandlerTestBase {
+
+ protected DocumentProcessingHandler handler;
+ protected ServerTestDriver driver;
+ protected RemoteServer remoteServer;
+ protected DocumentTypeManager documentTypeManager = new DocumentTypeManager();
+ SessionCache sessionCache;
+ private List<MbusServerProvider> serviceProviders = new ArrayList<>();
+
+ @Before
+ public void createHandler() {
+ documentTypeManager.register(getType());
+
+ Protocol protocol = new DocumentProtocol(documentTypeManager);
+
+ driver = ServerTestDriver.newInactiveInstanceWithProtocol(protocol);
+
+ sessionCache =
+ new SessionCache("raw:", driver.client().slobrokId(), "test", "raw:", null, "raw:", documentTypeManager);
+
+ ContainerBuilder builder = driver.parent().newContainerBuilder();
+ ComponentRegistry<DocprocService> registry = new ComponentRegistry<>();
+
+ handler = new DocumentProcessingHandler(registry,
+ new ComponentRegistry<>(),
+ new ComponentRegistry<>(),
+ new DocumentProcessingHandlerParameters().
+ setDocumentTypeManager(documentTypeManager).
+ setContainerDocumentConfig(new ContainerDocumentConfig(new ContainerDocumentConfig.Builder())));
+ builder.serverBindings().bind("mbus://*/*", handler);
+
+ ReferencedResource<SharedSourceSession> sessionRef = sessionCache.retainSource(new SourceSessionParams());
+ MbusClient sourceClient = new MbusClient(sessionRef.getResource());
+ builder.clientBindings().bind("mbus://*/source", sourceClient);
+ builder.clientBindings().bind("mbus://*/" + MbusRequestContext.internalNoThrottledSource, sourceClient);
+ sourceClient.start();
+
+ List<Pair<String, CallStack>> callStacks = getCallStacks();
+ List<AbstractResource> resources = new ArrayList<>();
+ for (Pair<String, CallStack> callStackPair : callStacks) {
+ DocprocService service = new DocprocService(callStackPair.getFirst());
+ service.setCallStack(callStackPair.getSecond());
+ service.setInService(true);
+
+ ComponentId serviceId = new ComponentId(service.getName());
+ registry.register(serviceId, service);
+
+ ComponentId sessionName = ComponentId.fromString("chain." + serviceId);
+ MbusServerProvider serviceProvider = new MbusServerProvider(sessionName, sessionCache, driver.parent());
+ serviceProvider.get().start();
+
+ serviceProviders.add(serviceProvider);
+
+ MbusClient intermediateClient = new MbusClient(serviceProvider.getSession());
+ builder.clientBindings().bind("mbus://*/" + sessionName.stringValue(), intermediateClient);
+ intermediateClient.start();
+ resources.add(intermediateClient);
+ }
+
+ driver.parent().activateContainer(builder);
+ sessionRef.getReference().close();
+ sourceClient.release();
+
+ for (AbstractResource resource : resources) {
+ resource.release();
+ }
+
+ remoteServer = RemoteServer.newInstance(driver.client().slobrokId(), "foobar", protocol);
+ }
+
+ @After
+ public void destroy() {
+ for (MbusServerProvider serviceProvider : serviceProviders) {
+ serviceProvider.deconstruct();
+ }
+ driver.close();
+ remoteServer.close();
+ }
+
+ protected abstract List<Pair<String, CallStack>> getCallStacks();
+
+ protected abstract DocumentType getType();
+
+ public boolean sendMessage(String destinationChainName, DocumentMessage msg) {
+ msg.setRoute(Route.parse("test/chain." + destinationChainName + " " + remoteServer.connectionSpec()));
+ msg.setPriority(DocumentProtocol.Priority.HIGH_1);
+ msg.setLoadType(LoadType.DEFAULT);
+ msg.getTrace().setLevel(9);
+ msg.setTimeRemaining(60 * 1000);
+ return driver.client().sendMessage(msg).isAccepted();
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java
new file mode 100644
index 00000000000..f3b80506021
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingHandlerTransformingMessagesTestCase.java
@@ -0,0 +1,237 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.docproc.CallStack;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.*;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.update.FieldUpdate;
+import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.Routable;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingHandlerTransformingMessagesTestCase extends DocumentProcessingHandlerTestBase {
+
+ private static final String FOOBAR = "foobar";
+ private final DocumentType type;
+
+ public DocumentProcessingHandlerTransformingMessagesTestCase() {
+ type = new DocumentType("foo");
+ type.addField("foostring", DataType.STRING);
+ }
+
+ @Override
+ public List<Pair<String, CallStack>> getCallStacks() {
+ CallStack stack = new CallStack();
+ stack.addLast(new TransformingDocumentProcessor());
+
+ ArrayList<Pair<String, CallStack>> stacks = new ArrayList<>(1);
+ stacks.add(new Pair<>(FOOBAR, stack));
+ return stacks;
+ }
+
+ @Override
+ public DocumentType getType() {
+ return type;
+ }
+
+ private Routable sendMessageAndGetResult(DocumentMessage message) throws InterruptedException {
+ assertTrue(sendMessage(FOOBAR, message));
+
+ Message result = remoteServer.awaitMessage(60, TimeUnit.SECONDS);
+ assertNotNull(result);
+ remoteServer.ackMessage(result);
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertFalse(reply.hasErrors());
+
+ return result;
+ }
+
+ @Test
+ public void testAllMessages() throws InterruptedException {
+ put();
+ remove();
+ update();
+ batchDocumentUpdate();
+ }
+
+ private void put() throws InterruptedException {
+ {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:put")));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(PutDocumentMessage.class));
+ PutDocumentMessage outputMsg = (PutDocumentMessage)result;
+ assertThat(outputMsg.getDocumentPut().getDocument().getFieldValue("foostring").toString(), is("banana"));
+ }
+ {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:remove")));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(RemoveDocumentMessage.class));
+ RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result;
+ assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:put:to:remove"));
+ }
+ {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:update")));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(UpdateDocumentMessage.class));
+ UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result;
+ assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:put:to:update"));
+ }
+ {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(new Document(getType(), "doc:nodocstatus:put:to:nothing")));
+ assertTrue(sendMessage(FOOBAR, message));
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertThat(reply, instanceOf(DocumentReply.class));
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ private void remove() throws InterruptedException {
+ {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:put"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(PutDocumentMessage.class));
+ PutDocumentMessage outputMsg = (PutDocumentMessage)result;
+ assertThat(outputMsg.getDocumentPut().getDocument().getId().toString(), is("doc:nodocstatus:remove:to:put"));
+ }
+
+ {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:remove"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(RemoveDocumentMessage.class));
+ RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result;
+ assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:remove:to:remove"));
+ }
+ {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:update"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(UpdateDocumentMessage.class));
+ UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result;
+ assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:remove:to:update"));
+ }
+ {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(new DocumentId("doc:nodocstatus:remove:to:nothing"));
+ assertTrue(sendMessage(FOOBAR, message));
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertThat(reply, instanceOf(DocumentReply.class));
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ private void update() throws InterruptedException {
+ {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:put"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(PutDocumentMessage.class));
+ PutDocumentMessage outputMsg = (PutDocumentMessage)result;
+ assertThat(outputMsg.getDocumentPut().getDocument().getId().toString(), is("doc:nodocstatus:update:to:put"));
+ }
+
+ {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:remove"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(RemoveDocumentMessage.class));
+ RemoveDocumentMessage outputMsg = (RemoveDocumentMessage)result;
+ assertThat(outputMsg.getDocumentId().toString(), is("doc:nodocstatus:update:to:remove"));
+ }
+ {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:update"));
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(UpdateDocumentMessage.class));
+ UpdateDocumentMessage outputMsg = (UpdateDocumentMessage)result;
+ assertThat(outputMsg.getDocumentUpdate().getId().toString(), is("doc:nodocstatus:update:to:update"));
+ }
+ {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(new DocumentUpdate(getType(), "doc:nodocstatus:update:to:nothing"));
+ assertTrue(sendMessage(FOOBAR, message));
+ Reply reply = driver.client().awaitReply(60, TimeUnit.SECONDS);
+ assertNotNull(reply);
+ assertThat(reply, instanceOf(DocumentReply.class));
+ assertFalse(reply.hasErrors());
+ }
+ }
+
+ private void batchDocumentUpdate() throws InterruptedException {
+ DocumentUpdate doc1 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:batch:nodocstatus:keep:this"));
+ DocumentUpdate doc2 = new DocumentUpdate(getType(), new DocumentId("userdoc:test:12345:batch:nodocstatus:skip:this"));
+
+ Field testField = getType().getField("foostring");
+ doc1.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("1 not yet processed")));
+ doc2.addFieldUpdate(FieldUpdate.createAssign(testField, new StringFieldValue("2 not yet processed")));
+
+ BatchDocumentUpdateMessage message = new BatchDocumentUpdateMessage(12345);
+ message.addUpdate(doc1);
+ message.addUpdate(doc2);
+
+ Routable result = sendMessageAndGetResult(message);
+ assertThat(result, instanceOf(UpdateDocumentMessage.class));
+ DocumentUpdate outputUpd = ((UpdateDocumentMessage)result).getDocumentUpdate();
+ assertThat(outputUpd.getId().toString(), is("userdoc:test:12345:batch:nodocstatus:keep:this"));
+ }
+
+ public class TransformingDocumentProcessor extends DocumentProcessor {
+
+ @Override
+ public Progress process(Processing processing) {
+ ListIterator<DocumentOperation> it = processing.getDocumentOperations().listIterator();
+ while (it.hasNext()) {
+ DocumentOperation op = it.next();
+ String id = op.getId().toString();
+ if ("doc:nodocstatus:put:to:put".equals(id)) {
+ Document doc = ((DocumentPut)op).getDocument();
+ doc.setFieldValue("foostring", new StringFieldValue("banana"));
+ } else if ("doc:nodocstatus:put:to:remove".equals(id)) {
+ it.set(new DocumentRemove(new DocumentId(id)));
+ } else if ("doc:nodocstatus:put:to:update".equals(id)) {
+ it.set(new DocumentUpdate(getType(), id));
+ } else if ("doc:nodocstatus:put:to:nothing".equals(id)) {
+ it.remove();
+ } else if ("doc:nodocstatus:remove:to:put".equals(id)) {
+ it.set(new DocumentPut(getType(), op.getId()));
+ } else if ("doc:nodocstatus:remove:to:remove".equals(id)) {
+ //nada
+ } else if ("doc:nodocstatus:remove:to:update".equals(id)) {
+ it.set(new DocumentUpdate(getType(), id));
+ } else if ("doc:nodocstatus:remove:to:nothing".equals(id)) {
+ it.remove();
+ } else if ("doc:nodocstatus:update:to:put".equals(id)) {
+ it.set(new DocumentPut(getType(), op.getId()));
+ } else if ("doc:nodocstatus:update:to:remove".equals(id)) {
+ it.set(new DocumentRemove(new DocumentId(id)));
+ } else if ("doc:nodocstatus:update:to:update".equals(id)) {
+ //nada
+ } else if ("doc:nodocstatus:update:to:nothing".equals(id)) {
+ it.remove();
+ } else if ("userdoc:12345:6789:multiop:nodocstatus:keep:this".equals(id)) {
+ //nada
+ } else if ("userdoc:12345:6789:multiop:nodocstatus:skip:this".equals(id)) {
+ it.remove();
+ } else if ("userdoc:test:12345:batch:nodocstatus:keep:this".equals(id)) {
+ //nada
+ } else if ("userdoc:test:12345:batch:nodocstatus:skip:this".equals(id)) {
+ it.remove();
+ }
+ }
+ return Progress.DONE;
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java
new file mode 100644
index 00000000000..8dc811bece6
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/jdisc/DocumentProcessingTaskPrioritizationTestCase.java
@@ -0,0 +1,131 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.jdisc;
+
+import com.yahoo.docproc.Processing;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DocumentProcessingTaskPrioritizationTestCase {
+
+ @Test
+ public void proritization() {
+ Queue<DocumentProcessingTask> queue = new PriorityBlockingQueue<>();
+
+ DocumentProcessingTask highest = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST);
+ DocumentProcessingTask veryhigh = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_HIGH);
+ DocumentProcessingTask high1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGH_1);
+ DocumentProcessingTask normal_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1);
+ DocumentProcessingTask low_1 = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOW_1);
+ DocumentProcessingTask verylow = new TestDocumentProcessingTask(DocumentProtocol.Priority.VERY_LOW);
+ DocumentProcessingTask lowest = new TestDocumentProcessingTask(DocumentProtocol.Priority.LOWEST);
+
+ DocumentProcessingTask normal_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1);
+ DocumentProcessingTask normal_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1);
+ DocumentProcessingTask normal_4 = new TestDocumentProcessingTask(DocumentProtocol.Priority.NORMAL_1);
+
+ DocumentProcessingTask highest_2 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST);
+ DocumentProcessingTask highest_3 = new TestDocumentProcessingTask(DocumentProtocol.Priority.HIGHEST);
+
+
+ queue.add(highest);
+ queue.add(veryhigh);
+ queue.add(high1);
+ queue.add(normal_1);
+ queue.add(low_1);
+ queue.add(verylow);
+ queue.add(lowest);
+
+ queue.add(normal_2);
+ queue.add(normal_3);
+ queue.add(normal_4);
+
+ queue.add(highest_2);
+ queue.add(highest_3);
+
+ assertThat(queue.poll(), sameInstance(highest));
+ assertThat(queue.poll(), sameInstance(highest_2));
+ assertThat(queue.poll(), sameInstance(highest_3));
+ assertThat(queue.poll(), sameInstance(veryhigh));
+ assertThat(queue.poll(), sameInstance(high1));
+ assertThat(queue.poll(), sameInstance(normal_1));
+ assertThat(queue.poll(), sameInstance(normal_2));
+ assertThat(queue.poll(), sameInstance(normal_3));
+ assertThat(queue.poll(), sameInstance(normal_4));
+ assertThat(queue.poll(), sameInstance(low_1));
+ assertThat(queue.poll(), sameInstance(verylow));
+ assertThat(queue.poll(), sameInstance(lowest));
+ assertThat(queue.poll(), nullValue());
+ }
+
+ private class TestDocumentProcessingTask extends DocumentProcessingTask {
+ private TestDocumentProcessingTask(DocumentProtocol.Priority priority) {
+ super(new TestRequestContext(priority), null, null);
+ }
+ }
+
+ private class TestRequestContext implements RequestContext {
+ private final DocumentProtocol.Priority priority;
+
+ public TestRequestContext(DocumentProtocol.Priority priority) {
+ this.priority = priority;
+ }
+
+ @Override
+ public List<Processing> getProcessings() {
+ return null;
+ }
+
+ @Override
+ public void skip() {
+ }
+
+ @Override
+ public void processingDone(List<Processing> processing) {
+ }
+
+ @Override
+ public void processingFailed(ErrorCode error, String msg) {
+ }
+
+ @Override
+ public void processingFailed(Exception exception) {
+ }
+
+ @Override
+ public int getApproxSize() {
+ return 0;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority.getValue();
+ }
+
+ @Override
+ public boolean isProcessable() {
+ return true;
+ }
+
+ @Override
+ public URI getUri() {
+ return null;
+ }
+
+ @Override
+ public String getServiceName() {
+ return null;
+ }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java b/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java
new file mode 100644
index 00000000000..e5d3a4dc137
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/proxy/SchemaMappingAndAccessesTest.java
@@ -0,0 +1,566 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.proxy;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import com.yahoo.collections.Pair;
+import com.yahoo.config.docproc.SchemamappingConfig;
+import com.yahoo.docproc.Accesses;
+import com.yahoo.docproc.Accesses.Field;
+import com.yahoo.docproc.Call;
+import com.yahoo.docproc.DocumentProcessingAbstractTestCase.TestDocumentProcessor1;
+import com.yahoo.docproc.DocumentProcessor;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.DataType;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentType;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.StructDataType;
+import com.yahoo.document.annotation.Annotation;
+import com.yahoo.document.annotation.AnnotationType;
+import com.yahoo.document.annotation.SpanTree;
+import com.yahoo.document.datatypes.Array;
+import com.yahoo.document.datatypes.IntegerFieldValue;
+import com.yahoo.document.datatypes.StringFieldValue;
+import com.yahoo.document.datatypes.Struct;
+import com.yahoo.document.datatypes.StructuredFieldValue;
+import com.yahoo.document.update.FieldUpdate;
+
+public class SchemaMappingAndAccessesTest extends junit.framework.TestCase {
+
+ private Document getDoc() {
+ DocumentType type = new DocumentType("album");
+ AnnotationType personType = new AnnotationType("person");
+ Annotation person = new Annotation(personType);
+ type.addField("title", DataType.STRING);
+ type.addField("artist", DataType.STRING);
+ type.addField("guitarist", DataType.STRING);
+ type.addField("year", DataType.INT);
+ type.addField("labels", DataType.getArray(DataType.STRING));
+ Document doc = new Document(type, new DocumentId("doc:map:test:1"));
+ doc.setFieldValue("title", new StringFieldValue("Black Rock"));
+ StringFieldValue joe = new StringFieldValue("Joe Bonamassa");
+ joe.setSpanTree(new SpanTree("mytree").annotate(person));
+ doc.setFieldValue("artist", joe);
+ doc.setFieldValue("year", new IntegerFieldValue(2010));
+ Array<StringFieldValue> labels = new Array<>(type.getField("labels").getDataType());
+ labels.add(new StringFieldValue("audun"));
+ labels.add(new StringFieldValue("tylden"));
+ doc.setFieldValue("labels", labels);
+
+ StructDataType personStructType = new StructDataType("artist");
+ personStructType.addField(new com.yahoo.document.Field("firstname", DataType.STRING));
+ personStructType.addField(new com.yahoo.document.Field("lastname", DataType.STRING));
+ type.addField("listeners", DataType.getArray(personStructType));
+
+ Array<Struct> listeners = new Array<>(type.getField("listeners").getDataType());
+
+ Struct listenerOne = new Struct(personStructType);
+ listenerOne.setFieldValue("firstname", new StringFieldValue("per"));
+ listenerOne.setFieldValue("lastname", new StringFieldValue("olsen"));
+ Struct listenerTwo = new Struct(personStructType);
+ listenerTwo.setFieldValue("firstname", new StringFieldValue("anders"));
+ listenerTwo.setFieldValue("lastname", new StringFieldValue("and"));
+
+ listeners.add(listenerOne);
+ listeners.add(listenerTwo);
+
+ doc.setFieldValue("listeners", listeners);
+
+ return doc;
+ }
+
+ public void testMappingArrays() {
+ Document doc = getDoc();
+ DocumentProcessor proc = new TestMappingArrayProcessor();
+
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("label", "labels[0]");
+ ProxyDocument mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ Processing p = Processing.of(new DocumentPut(mapped));
+ proc.process(p);
+
+ assertEquals(2, ((Array<StringFieldValue>) doc.getFieldValue("labels")).size());
+ assertEquals(new StringFieldValue("EMI"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(0));
+ assertEquals(new StringFieldValue("tylden"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(1));
+
+
+ fieldMap.clear();
+ fieldMap.put("label", "labels[2]");
+ mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ p = Processing.of(new DocumentPut(mapped));
+ try {
+ proc.process(p);
+ fail("Should not have worked");
+ } catch (IllegalArgumentException iae) {
+ //ok!
+ }
+ assertEquals(2, ((Array<StringFieldValue>) doc.getFieldValue("labels")).size());
+ assertEquals(new StringFieldValue("EMI"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(0));
+ assertEquals(new StringFieldValue("tylden"), ((Array<StringFieldValue>) doc.getFieldValue("labels")).get(1));
+ }
+
+ public void testMappingStructsInArrays() {
+ Document doc = getDoc();
+ DocumentProcessor proc = new TestMappingStructInArrayProcessor();
+
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("name", "listeners[0].firstname");
+ ProxyDocument mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ Processing p = Processing.of(new DocumentPut(mapped));
+ proc.process(p);
+
+ assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size());
+ assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString()));
+ assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString()));
+ assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString()));
+ assertEquals("and", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")).getString()));
+
+
+ fieldMap.clear();
+ fieldMap.put("name", "listeners[2].firstname");
+ mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ p = Processing.of(new DocumentPut(mapped));
+ try {
+ proc.process(p);
+ fail("Should not have worked");
+ } catch (IllegalArgumentException iae) {
+ //ok!
+ }
+ assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size());
+ assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString()));
+ assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString()));
+ assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString()));
+ assertEquals("and", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname")).getString()));
+
+
+ //test remove:
+ proc = new TestRemovingMappingStructInArrayProcessor();
+
+ fieldMap.clear();
+ fieldMap.put("name", "listeners[1].lastname");
+ mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ p = Processing.of(new DocumentPut(mapped));
+ proc.process(p);
+
+ assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size());
+ assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString()));
+ assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString()));
+ assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString()));
+ assertNull(((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname"));
+
+
+ fieldMap.clear();
+ fieldMap.put("name", "listeners[2].lastname");
+ mapped = new ProxyDocument(proc, doc, fieldMap);
+
+ p = Processing.of(new DocumentPut(mapped));
+ try {
+ proc.process(p);
+ fail("Should not have worked");
+ } catch (IllegalArgumentException iae) {
+ //ok!
+ }
+ assertEquals(2, ((Array<Struct>) doc.getFieldValue("listeners")).size());
+ assertEquals("peter", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("firstname")).getString()));
+ assertEquals("olsen", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(0).getFieldValue("lastname")).getString()));
+ assertEquals("anders", (((StringFieldValue)((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("firstname")).getString()));
+ assertNull(((Array<Struct>) doc.getFieldValue("listeners")).get(1).getFieldValue("lastname"));
+
+ }
+
+
+ public void testMappingSpanTrees() {
+ Document doc = getDoc();
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("t", "title");
+ fieldMap.put("a", "artist");
+ fieldMap.put("g", "guitarist");
+ ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap);
+ Iterator<SpanTree> itSpanTreesDoc = ((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator();
+ Iterator<Annotation> itAnnotDoc = itSpanTreesDoc.next().iterator();
+ Iterator<SpanTree> itSpanTreesMapped = ((StringFieldValue) mapped.getFieldValue("artist")).getSpanTrees().iterator();
+ Iterator<Annotation> itAnnotMapped = itSpanTreesMapped.next().iterator();
+
+ assertEquals(itAnnotDoc.next().getType().getName(), "person");
+ assertFalse(itAnnotDoc.hasNext());
+ assertEquals(itAnnotMapped.next().getType().getName(), "person");
+ assertFalse(itAnnotMapped.hasNext());
+
+ AnnotationType guitaristType = new AnnotationType("guitarist");
+ Annotation guitarist = new Annotation(guitaristType);
+ StringFieldValue bona = new StringFieldValue("Bonamassa");
+ bona.setSpanTree(new SpanTree("mytree").annotate(guitarist));
+ StringFieldValue clapton = new StringFieldValue("Clapton");
+ mapped.setFieldValue("a", bona);
+ mapped.setFieldValue("g", clapton);
+
+ itSpanTreesDoc = ((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator();
+ itAnnotDoc = itSpanTreesDoc.next().iterator();
+ itSpanTreesMapped = ((StringFieldValue) mapped.getFieldValue("artist")).getSpanTrees().iterator();
+ itAnnotMapped = itSpanTreesMapped.next().iterator();
+
+ assertEquals(itAnnotDoc.next().getType().getName(), "guitarist");
+ assertFalse(itAnnotDoc.hasNext());
+ assertEquals(itAnnotMapped.next().getType().getName(), "guitarist");
+ assertFalse(itAnnotMapped.hasNext());
+
+ assertSame(((StringFieldValue) doc.getFieldValue("artist")).getSpanTrees().iterator().next(), ((StringFieldValue) mapped.getFieldValue("a")).getSpanTrees().iterator().next());
+ //assertSame(clapton, mapped.getFieldValue("g"));
+ //assertSame(bona, mapped.getFieldValue("a"));
+ }
+
+ public void testMappedDoc() {
+ Document doc = getDoc();
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("t", "title");
+ fieldMap.put("a", "artist");
+ ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap);
+ //Document mapped=doc;
+ //mapped.setFieldMap(fieldMap);
+ assertEquals(new StringFieldValue("Black Rock"), mapped.getFieldValue("t"));
+ //assertEquals(new StringFieldValue("Black Rock"), proxy.getFieldValue(new com.yahoo.document.Field("t")));
+ assertEquals(new StringFieldValue("Joe Bonamassa").getWrappedValue(), mapped.getFieldValue("a").getWrappedValue());
+ mapped.setFieldValue("t", new StringFieldValue("The Ballad Of John Henry"));
+ StringFieldValue bona = new StringFieldValue("Bonamassa");
+ mapped.setFieldValue("a", bona);
+ //mapped.setFieldValue("a", new StringFieldValue("Bonamassa"));
+ assertEquals(new StringFieldValue("The Ballad Of John Henry"), doc.getFieldValue("title"));
+ assertEquals(new StringFieldValue("The Ballad Of John Henry"), mapped.getFieldValue("t"));
+ assertEquals(new StringFieldValue("Bonamassa"), doc.getFieldValue("artist"));
+ assertEquals(new StringFieldValue("Bonamassa"), mapped.getFieldValue("a"));
+ mapped.setFieldValue("a", mapped.getFieldValue("a") + "Hughes");
+ assertEquals(new StringFieldValue("BonamassaHughes"), mapped.getFieldValue("a"));
+ // Verify consistency when using string values to manipluate annotation span trees
+ StringFieldValue unmapped1 = (StringFieldValue) doc.getFieldValue("artist");
+ StringFieldValue unmapped2 = (StringFieldValue) doc.getFieldValue("artist");
+ assertTrue(unmapped1==unmapped2);
+ unmapped1.setSpanTree(new SpanTree("test"));
+ assertEquals(unmapped2.getSpanTree("test").getName(), "test");
+
+ StringFieldValue mapped1 = (StringFieldValue) mapped.getFieldValue("a");
+ mapped1.setSpanTree(new SpanTree("test2"));
+ StringFieldValue mapped2 = (StringFieldValue) mapped.getFieldValue("a");
+ assertTrue(mapped1==mapped2);
+ assertEquals(mapped2.getSpanTree("test2").getName(), "test2");
+
+ mapped.removeFieldValue("a");
+ assertEquals(mapped.getFieldValue("a"), null);
+ mapped.removeFieldValue(mapped.getField("t"));
+ assertEquals(mapped.getFieldValue("t"), null);
+ mapped.setFieldValue("a", new StringFieldValue("Bonamassa"));
+ assertEquals(new StringFieldValue("Bonamassa"), doc.getFieldValue("artist"));
+ mapped.removeFieldValue("a");
+ assertEquals(mapped.getFieldValue("a"), null);
+ }
+
+ public void testMappedDocAPI() {
+ Document doc = getDoc();
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("t", "title");
+ fieldMap.put("a", "artist");
+ ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap);
+ assertEquals(mapped.getFieldValue("title"), doc.getFieldValue("title"));
+ assertEquals(mapped.getFieldValue(new com.yahoo.document.Field("title")), doc.getFieldValue((new com.yahoo.document.Field("title"))));
+ mapped.setFieldValue("title", "foo");
+ assertEquals(doc.getFieldValue("title").getWrappedValue(), "foo");
+ assertEquals(mapped.getWrappedDocumentOperation().getId().toString(), "doc:map:test:1");
+ assertEquals(doc, mapped);
+ assertEquals(doc.toString(), mapped.toString());
+ assertEquals(doc.hashCode(), mapped.hashCode());
+ assertEquals(doc.clone(), mapped.clone());
+ assertEquals(doc.iterator().hasNext(), mapped.iterator().hasNext());
+ assertEquals(doc.getId(), mapped.getId());
+ assertEquals(doc.getDataType(), mapped.getDataType());
+ mapped.setLastModified(56l);
+ assertEquals(doc.getLastModified(), (Long)56l);
+ assertEquals(mapped.getLastModified(), (Long)56l);
+ mapped.setId(new DocumentId("doc:map:test:2"));
+ assertEquals(mapped.getId().toString(), "doc:map:test:2");
+ assertEquals(doc.getId().toString(), "doc:map:test:2");
+ assertEquals(doc.getHeader(), mapped.getHeader());
+ assertEquals(doc.getBody(), mapped.getBody());
+ assertEquals(doc.getSerializedSize(), mapped.getSerializedSize());
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ByteArrayOutputStream bos2 = new ByteArrayOutputStream();
+ mapped.serialize(bos);
+ doc.serialize(bos2);
+ assertEquals(bos.toString(), bos2.toString());
+ assertEquals(mapped.toXml(), doc.toXml());
+ assertEquals(mapped.getFieldCount(), doc.getFieldCount());
+ assertTrue(mapped.getDocument()==doc);
+
+ mapped.clear();
+ assertNull(mapped.getFieldValue("title"));
+ assertNull(doc.getFieldValue("title"));
+ mapped.setDataType(new DocumentType("newType"));
+ assertEquals(doc.getDataType().getName(), "newType");
+ }
+
+ public void testMappedDocUpdateAPI() {
+ Document doc = getDoc();
+ DocumentType type = doc.getDataType();
+ DocumentUpdate dud = new DocumentUpdate(type, new DocumentId("doc:map:test:1"));
+ FieldUpdate assignSingle = FieldUpdate.createAssign(type.getField("title"), new StringFieldValue("something"));
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("t", "title");
+ fieldMap.put("a", "artist");
+ ProxyDocumentUpdate pup = new ProxyDocumentUpdate(dud, fieldMap);
+ pup.addFieldUpdate(assignSingle);
+ assertEquals(pup.getFieldUpdates(), dud.getFieldUpdates());
+ assertEquals(pup.getDocumentType(), dud.getDocumentType());
+ assertEquals(pup.getFieldUpdate(new com.yahoo.document.Field("title")).size(), 1);
+ assertEquals(pup.getFieldUpdate(0), dud.getFieldUpdate(0));
+ assertEquals(pup.getFieldUpdate("title"), dud.getFieldUpdate("title"));
+ assertEquals(pup.getId(), dud.getId());
+ assertEquals(pup.getType(), dud.getType());
+ assertEquals(pup.applyTo(doc), dud);
+ assertEquals(doc.getFieldValue("title").getWrappedValue(), "something");
+ assertEquals(pup, dud);
+ assertEquals(pup.hashCode(), dud.hashCode());
+ assertEquals(pup.toString(), dud.toString());
+ assertEquals(pup.size(), dud.size());
+ assertEquals(pup.getWrappedDocumentOperation().getId().toString(), "doc:map:test:1");
+ }
+
+ public void testMappedDocStruct() {
+ StructDataType materialsStructType = new StructDataType("materialstype");
+ materialsStructType.addField(new com.yahoo.document.Field("ceiling", DataType.STRING));
+ materialsStructType.addField(new com.yahoo.document.Field("walls", DataType.STRING));
+
+ DocumentType docType = new DocumentType("album");
+ docType.addField("title", DataType.STRING);
+ docType.addField("artist", DataType.STRING);
+ StructDataType storeStructType = new StructDataType("storetype");
+ storeStructType.addField(new com.yahoo.document.Field("name", DataType.STRING));
+ storeStructType.addField(new com.yahoo.document.Field("city", DataType.STRING));
+ storeStructType.addField(new com.yahoo.document.Field("materials", materialsStructType));
+ docType.addField("store", storeStructType);
+
+ Document doc = new Document(docType, new DocumentId("doc:map:test:1"));
+ doc.setFieldValue("title", new StringFieldValue("Black Rock"));
+ doc.setFieldValue("artist", new StringFieldValue("Joe Bonamassa"));
+ Struct material = new Struct(materialsStructType);
+ material.setFieldValue("ceiling", new StringFieldValue("wood"));
+ material.setFieldValue("walls", new StringFieldValue("brick"));
+ Struct store = new Struct(storeStructType);
+ store.setFieldValue("name", new StringFieldValue("Platekompaniet"));
+ store.setFieldValue("city", new StringFieldValue("Trondheim"));
+ store.setFieldValue(storeStructType.getField("materials"), material);
+ doc.setFieldValue(docType.getField("store"), store);
+
+ Map<String, String> fieldMap = new HashMap<>();
+ fieldMap.put("t", "title");
+ fieldMap.put("c", "store.city");
+ fieldMap.put("w", "store.materials.walls");
+ ProxyDocument mapped = new ProxyDocument(new TestDocumentProcessor1(), doc, fieldMap);
+ assertEquals(new StringFieldValue("Trondheim"), mapped.getFieldValue("c"));
+ assertEquals(new StringFieldValue("Black Rock"), mapped.getFieldValue("t"));
+ assertEquals(new StringFieldValue("brick"), mapped.getFieldValue("w"));
+ assertEquals(new StringFieldValue("brick"), material.getFieldValue("walls"));
+ mapped.setFieldValue("c", new StringFieldValue("Steinkjer"));
+ mapped.setFieldValue("w", new StringFieldValue("plaster"));
+ assertEquals(new StringFieldValue("plaster"), mapped.getFieldValue("w"));
+ assertEquals(new StringFieldValue("plaster"), material.getFieldValue("walls"));
+ assertEquals(new StringFieldValue("Steinkjer"), store.getFieldValue("city"));
+ assertEquals(new StringFieldValue("Steinkjer"), mapped.getFieldValue("c"));
+ assertEquals(new StringFieldValue("Steinkjer"), mapped.getFieldValue("c"));
+ mapped.setFieldValue("c", new StringFieldValue("Levanger"));
+ assertEquals(new StringFieldValue("Levanger"), store.getFieldValue("city"));
+ assertEquals(new StringFieldValue("Levanger"), mapped.getFieldValue("c"));
+ mapped.setFieldValue("c", mapped.getFieldValue("c") + "Kommune");
+ assertEquals(new StringFieldValue("LevangerKommune"), mapped.getFieldValue("c"));
+ //mapped.set(mapped.getField("c"), mapped.get("c")+"Styre");
+ //assertEquals(new StringFieldValue("LevangerKommuneStyre"), mapped.getFieldValue("c"));
+ }
+
+ public void testSchemaMap() {
+ SchemaMap map = new SchemaMap();
+ map.addMapping("mychain", "com.yahoo.MyDocProc", "mydoctype", "inDoc1", "inProc1");
+ map.addMapping("mychain", "com.yahoo.MyDocProc", "mydoctype", "inDoc2", "inProc2");
+ Map<Pair<String, String>, String> cMap = map.chainMap("mychain", "com.yahoo.MyDocProc");
+ assertEquals("inDoc1", cMap.get(new Pair<>("mydoctype", "inProc1")));
+ assertEquals("inDoc2", cMap.get(new Pair<>("mydoctype", "inProc2")));
+ assertNull(cMap.get(new Pair<>("invalidtype", "inProc2")));
+ Map<Pair<String, String>, String> noMap = map.chainMap("invalidchain", "com.yahoo.MyDocProc");
+ Map<Pair<String, String>, String> noMap2 = map.chainMap("mychain", "com.yahoo.MyInvalidDocProc");
+ assertTrue(noMap.isEmpty());
+ assertTrue(noMap2.isEmpty());
+
+ DocumentProcessor proc = new TestDocumentProcessor1();
+ proc.setFieldMap(cMap);
+ Map<String, String> dMap = proc.getDocMap("mydoctype");
+ assertEquals("inDoc1", dMap.get("inProc1"));
+ assertEquals("inDoc2", dMap.get("inProc2"));
+ }
+
+ public void testSchemaMapKey() {
+ SchemaMap map = new SchemaMap(null);
+ SchemaMap.SchemaMapKey key1 = map.new SchemaMapKey("chain", "docproc", "doctype", "from");
+ SchemaMap.SchemaMapKey key1_1 = map.new SchemaMapKey("chain", "docproc", "doctype", "from");
+ SchemaMap.SchemaMapKey key2 = map.new SchemaMapKey("chain", "docproc", "doctype2", "from");
+ assertTrue(key1.equals(key1_1));
+ assertFalse(key1.equals(key2));
+ }
+
+ public void testSchemaMapConfig() {
+ SchemaMap map = new SchemaMap(null);
+ SchemamappingConfig.Builder scb = new SchemamappingConfig.Builder();
+ scb.fieldmapping(new SchemamappingConfig.Fieldmapping.Builder().chain("mychain").docproc("mydocproc").doctype("mydoctype").
+ indocument("myindoc").inprocessor("myinprocessor"));
+ map.configure(new SchemamappingConfig(scb));
+ assertEquals(map.chainMap("mychain", "mydocproc").get(new Pair<>("mydoctype", "myinprocessor")), "myindoc");
+ }
+
+ public void testSchemaMapNoDocType() {
+ SchemaMap map = new SchemaMap(null);
+ map.addMapping("mychain", "com.yahoo.MyDocProc", null, "inDoc1", "inProc1");
+ map.addMapping("mychain", "com.yahoo.MyDocProc", null, "inDoc2", "inProc2");
+ Map<Pair<String, String>, String> cMap = map.chainMap("mychain", "com.yahoo.MyDocProc");
+ DocumentProcessor proc = new TestDocumentProcessor1();
+ proc.setFieldMap(cMap);
+ Map<String, String> dMap = proc.getDocMap("mydoctype");
+ assertEquals("inDoc1", dMap.get("inProc1"));
+ assertEquals("inDoc2", dMap.get("inProc2"));
+ }
+
+ public void testProxyAndSecure() {
+ DocumentProcessor procOK = new TestDPSecure();
+ Map<Pair<String, String>, String> fieldMap = new HashMap<>();
+ fieldMap.put(new Pair<>("album", "titleMapped"), "title");
+ procOK.setFieldMap(fieldMap);
+ DocumentPut put = new DocumentPut(getDoc());
+ Document proxyDoc = new Call(procOK).configDoc(procOK, put).getDocument();
+ procOK.process(Processing.of(new DocumentPut(proxyDoc)));
+ assertEquals(proxyDoc.getFieldValue("title").toString(), "MyTitle MyTitle");
+ }
+
+ public void testProxyAndSecureSecureFailing() {
+ DocumentProcessor procInsecure = new TestDPInsecure();
+ Map<Pair<String, String>, String> fieldMap = new HashMap<>();
+ fieldMap.put(new Pair<>("album", "titleMapped"), "title");
+ procInsecure.setFieldMap(fieldMap);
+ DocumentPut put = new DocumentPut(getDoc());
+ Document doc = new Call(procInsecure).configDoc(procInsecure, put).getDocument();
+ try {
+ procInsecure.process(Processing.of(new DocumentPut(doc)));
+ fail("Insecure docproc went through");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().matches(".*allowed.*"));
+ }
+ //assertEquals(doc.get("title"), "MyTitle");
+ }
+
+ /**
+ * To make it less likely to break schema mapping, we enforce that ProxyDocument does wrap every public
+ * non-static, non-final method on Document and StructuredFieldValue
+ */
+ public void testVerifyProxyDocumentOverridesEverything() {
+ List<Method> allPublicFromProxyDocument = new ArrayList<>();
+ for (Method m : ProxyDocument.class.getDeclaredMethods()) {
+ if (Modifier.isPublic(m.getModifiers())) {
+ allPublicFromProxyDocument.add(m);
+ }
+ }
+ List<Method> allPublicFromDoc = new ArrayList<>();
+ for (Method m : Document.class.getDeclaredMethods()) {
+ if (mustBeOverriddenInProxyDocument(m)) {
+ allPublicFromDoc.add(m);
+ }
+ }
+ for (Method m : StructuredFieldValue.class.getDeclaredMethods()) {
+ if (mustBeOverriddenInProxyDocument(m)) {
+ allPublicFromDoc.add(m);
+ }
+ }
+
+ for (Method m : allPublicFromDoc) {
+ boolean thisOneOk=false;
+ for (Method pdM : allPublicFromProxyDocument) {
+ if (sameNameAndParams(m, pdM)) thisOneOk=true;
+ }
+ if (!thisOneOk) {
+ throw new RuntimeException("ProxyDocument must override all public methods from Document. " +
+ "Missing: '"+m+"'. If the method doesn't need field mapping or @Accesses check, just " +
+ "override it and delegate the call to 'doc'.");
+
+ }
+ }
+ }
+
+ private boolean mustBeOverriddenInProxyDocument(Method m) {
+ if (!Modifier.isPublic(m.getModifiers())) return false;
+ if (Modifier.isStatic(m.getModifiers())) return false;
+ if (Modifier.isFinal(m.getModifiers())) return false;
+ return true;
+ }
+
+ private boolean sameNameAndParams(Method m1, Method m2) {
+ if (!m1.getName().equals(m2.getName())) return false;
+ if (m1.getParameterTypes().length!=m2.getParameterTypes().length) return false;
+ for (int i = 0; i<m1.getParameterTypes().length; i++) {
+ if (!m1.getParameterTypes()[i].equals(m2.getParameterTypes()[i])) return false;
+ }
+ return true;
+ }
+
+ @Accesses(value = { @Field(dataType = "String", description = "", name = "titleMapped") })
+ public static class TestDPSecure extends DocumentProcessor {
+
+ public Progress process(Processing processing) {
+ Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();
+ document.setFieldValue("titleMapped", new StringFieldValue("MyTitle"));
+ document.setFieldValue("titleMapped", new StringFieldValue(document.getFieldValue("titleMapped").toString() + " MyTitle"));
+ return Progress.DONE;
+ }
+ }
+
+ @Accesses(value = { @Field(dataType = "String", description = "", name = "titleMappedFoo") })
+ public static class TestDPInsecure extends DocumentProcessor {
+
+ public Progress process(Processing processing) {
+ Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();
+ document.setFieldValue("titleMapped", new StringFieldValue("MyTitle"));
+ document.setFieldValue("titleMapped", new StringFieldValue(document.getFieldValue("titleMapped").toString() + " MyTitle"));
+ return Progress.DONE;
+ }
+ }
+
+ public static class TestMappingArrayProcessor extends DocumentProcessor {
+ public Progress process(Processing processing) {
+ Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();
+ document.setFieldValue("label", new StringFieldValue("EMI"));
+ return Progress.DONE;
+ }
+ }
+
+ public static class TestMappingStructInArrayProcessor extends DocumentProcessor {
+ public Progress process(Processing processing) {
+ Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();;
+ document.setFieldValue("name", new StringFieldValue("peter"));
+ return Progress.DONE;
+ }
+ }
+
+ public static class TestRemovingMappingStructInArrayProcessor extends DocumentProcessor {
+ public Progress process(Processing processing) {
+ Document document = ((DocumentPut)processing.getDocumentOperations().get(0)).getDocument();;
+ document.removeFieldValue("name");
+ return Progress.DONE;
+ }
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java b/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java
new file mode 100644
index 00000000000..2d5a61fa005
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/util/SplitterJoinerTestCase.java
@@ -0,0 +1,85 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.docproc.util;
+
+import com.yahoo.config.subscription.ConfigGetter;
+import com.yahoo.config.docproc.SplitterJoinerDocumentProcessorConfig;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.config.DocumentmanagerConfig;
+import com.yahoo.docproc.Processing;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.datatypes.Array;
+import com.yahoo.document.datatypes.StringFieldValue;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class SplitterJoinerTestCase {
+
+ @Test
+ public void testSplitJoin() {
+ ConfigGetter<SplitterJoinerDocumentProcessorConfig> getter = new ConfigGetter<>(SplitterJoinerDocumentProcessorConfig.class);
+ ConfigGetter<DocumentmanagerConfig> docManGetter = new ConfigGetter<>(DocumentmanagerConfig.class);
+
+ SplitterJoinerDocumentProcessorConfig cfg =
+ getter.getConfig("file:src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg");
+ DocumentmanagerConfig docManCfg =
+ docManGetter.getConfig("file:src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg");
+
+ SplitterDocumentProcessor splitter = new SplitterDocumentProcessor(cfg, docManCfg);
+
+ DocumentTypeManager manager = splitter.manager;
+
+
+ /**** Create documents: ****/
+
+ Document inner1 = new Document(manager.getDocumentType("docindoc"), "doc:inner:number:one");
+ inner1.setFieldValue("name", new StringFieldValue("Donald Duck"));
+ inner1.setFieldValue("content", new StringFieldValue("Lives in Duckburg"));
+ Document inner2 = new Document(manager.getDocumentType("docindoc"), "doc:inner:number:two");
+ inner2.setFieldValue("name", new StringFieldValue("Uncle Scrooge"));
+ inner2.setFieldValue("content", new StringFieldValue("Lives in Duckburg, too."));
+
+ Array<Document> innerArray = (Array<Document>) manager.getDocumentType("outerdoc").getField("innerdocuments").getDataType().createFieldValue();
+ innerArray.add(inner1);
+ innerArray.add(inner2);
+
+ Document outer = new Document(manager.getDocumentType("outerdoc"), "doc:outer:the:only:one");
+ outer.setFieldValue("innerdocuments", innerArray);
+
+ /**** End create documents ****/
+
+
+ Processing p = Processing.of(new DocumentPut(outer));
+ splitter.process(p);
+
+ assertEquals(2, p.getDocumentOperations().size());
+ assertThat(((DocumentPut)(p.getDocumentOperations().get(0))).getDocument(), sameInstance(inner1));
+ assertThat(((DocumentPut)(p.getDocumentOperations().get(1))).getDocument(), sameInstance(inner2));
+ assertThat(((DocumentPut)(p.getVariable(cfg.contextFieldName()))).getDocument(), sameInstance(outer));
+ assertThat(outer.getFieldValue("innerdocuments"), sameInstance(innerArray));
+ assertTrue(innerArray.isEmpty());
+
+
+ JoinerDocumentProcessor joiner = new JoinerDocumentProcessor(cfg, docManCfg);
+
+ joiner.process(p);
+
+ assertThat(p.getDocumentOperations().size(), equalTo(1));
+ assertThat(((DocumentPut)p.getDocumentOperations().get(0)).getDocument(), sameInstance(outer));
+ assertThat(p.getVariable(cfg.contextFieldName()), nullValue());
+ assertThat(outer.getFieldValue("innerdocuments"), sameInstance(innerArray));
+ assertThat(innerArray.size(), equalTo(2));
+ assertThat(innerArray.get(0), sameInstance(inner1));
+ assertThat(innerArray.get(1), sameInstance(inner2));
+ }
+
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd b/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd
new file mode 100644
index 00000000000..83bc4254fb2
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/util/docindoc.sd
@@ -0,0 +1,7 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+search docindoc {
+ document docindoc {
+ field name type string { header }
+ field content type string { body }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg b/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg
new file mode 100644
index 00000000000..3347c3127b5
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/util/documentmanager.docindoc.cfg
@@ -0,0 +1,42 @@
+enablecompression false
+datatype[7]
+datatype[0].id -1407012075
+datatype[0].structtype[1]
+datatype[0].structtype[0].name "outerdoc.body"
+datatype[0].structtype[0].version 0
+datatype[0].structtype[0].field[1]
+datatype[0].structtype[0].field[0].datatype -2035324352
+datatype[0].structtype[0].field[0].name "innerdocuments"
+datatype[1].id -1686125086
+datatype[1].structtype[1]
+datatype[1].structtype[0].name "docindoc.header"
+datatype[1].structtype[0].version 0
+datatype[1].structtype[0].field[1]
+datatype[1].structtype[0].field[0].datatype 2
+datatype[1].structtype[0].field[0].name "name"
+datatype[2].id -2035324352
+datatype[2].arraytype[1]
+datatype[2].arraytype[0].datatype 1447635645
+datatype[3].id -2040625920
+datatype[3].structtype[1]
+datatype[3].structtype[0].name "outerdoc.header"
+datatype[3].structtype[0].version 0
+datatype[4].id 1447635645
+datatype[4].documenttype[1]
+datatype[4].documenttype[0].bodystruct 2030224503
+datatype[4].documenttype[0].headerstruct -1686125086
+datatype[4].documenttype[0].name "docindoc"
+datatype[4].documenttype[0].version 0
+datatype[5].id 1748635999
+datatype[5].documenttype[1]
+datatype[5].documenttype[0].bodystruct -1407012075
+datatype[5].documenttype[0].headerstruct -2040625920
+datatype[5].documenttype[0].name "outerdoc"
+datatype[5].documenttype[0].version 0
+datatype[6].id 2030224503
+datatype[6].structtype[1]
+datatype[6].structtype[0].name "docindoc.body"
+datatype[6].structtype[0].version 0
+datatype[6].structtype[0].field[1]
+datatype[6].structtype[0].field[0].datatype 2
+datatype[6].structtype[0].field[0].name "content"
diff --git a/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd b/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd
new file mode 100644
index 00000000000..dbdf3c32cd5
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/util/outerdoc.sd
@@ -0,0 +1,6 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+search outerdoc {
+ document outerdoc {
+ field innerdocuments type array<docindoc> { body }
+ }
+}
diff --git a/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg b/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg
new file mode 100644
index 00000000000..16c26f7a6d8
--- /dev/null
+++ b/docproc/src/test/java/com/yahoo/docproc/util/splitter-joiner-document-processor.cfg
@@ -0,0 +1,2 @@
+documentTypeName "outerdoc"
+arrayFieldName "innerdocuments"
diff --git a/docproc/src/test/vespa-configdef/string.def b/docproc/src/test/vespa-configdef/string.def
new file mode 100644
index 00000000000..cfc0ecc578c
--- /dev/null
+++ b/docproc/src/test/vespa-configdef/string.def
@@ -0,0 +1,5 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+version=1
+
+namespace=config.docproc
+stringVal string default="_default_"