summaryrefslogtreecommitdiffstats
path: root/processing
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /processing
Publish
Diffstat (limited to 'processing')
-rw-r--r--processing/.gitignore2
-rw-r--r--processing/OWNERS1
-rw-r--r--processing/pom.xml42
-rw-r--r--processing/src/main/java/com/yahoo/processing/Processor.java42
-rw-r--r--processing/src/main/java/com/yahoo/processing/Request.java90
-rw-r--r--processing/src/main/java/com/yahoo/processing/Response.java162
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java157
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/Execution.java487
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java36
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java17
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java52
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java15
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java6
-rw-r--r--processing/src/main/java/com/yahoo/processing/execution/package-info.java6
-rw-r--r--processing/src/main/java/com/yahoo/processing/package-info.java10
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/CompoundName.java287
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java217
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/Properties.java573
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/package-info.java6
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java146
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java15
-rw-r--r--processing/src/main/java/com/yahoo/processing/request/properties/package-info.java6
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/AbstractData.java30
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java161
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java130
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/Data.java22
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/DataList.java85
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java150
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/FutureResponse.java82
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/IncomingData.java219
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/Ordered.java18
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/Streamed.java21
-rw-r--r--processing/src/main/java/com/yahoo/processing/response/package-info.java6
-rw-r--r--processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java554
-rw-r--r--processing/src/main/java/com/yahoo/processing/test/Responses.java32
-rw-r--r--processing/src/test/java/com/yahoo/processing/ResponseTestCase.java139
-rw-r--r--processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java46
-rw-r--r--processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java96
-rw-r--r--processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java173
-rw-r--r--processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java107
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java158
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java52
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java66
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java54
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java81
-rw-r--r--processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java137
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java44
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java60
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java30
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java37
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java25
-rw-r--r--processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java44
52 files changed, 5234 insertions, 0 deletions
diff --git a/processing/.gitignore b/processing/.gitignore
new file mode 100644
index 00000000000..3cc25b51fc4
--- /dev/null
+++ b/processing/.gitignore
@@ -0,0 +1,2 @@
+/pom.xml.build
+/target
diff --git a/processing/OWNERS b/processing/OWNERS
new file mode 100644
index 00000000000..31af040f698
--- /dev/null
+++ b/processing/OWNERS
@@ -0,0 +1 @@
+bratseth
diff --git a/processing/pom.xml b/processing/pom.xml
new file mode 100644
index 00000000000..3cc44b66e05
--- /dev/null
+++ b/processing/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>processing</artifactId>
+ <packaging>jar</packaging>
+ <version>6-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>provided-dependencies</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>chain</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>component</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/processing/src/main/java/com/yahoo/processing/Processor.java b/processing/src/main/java/com/yahoo/processing/Processor.java
new file mode 100644
index 00000000000..9339d5792b7
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/Processor.java
@@ -0,0 +1,42 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing;
+
+import com.yahoo.component.chain.ChainedComponent;
+import com.yahoo.processing.execution.Execution;
+
+/**
+ * Superclass of chainable components processing Requests to create Responses.
+ * <p>
+ * Processors typically changes the Request and/or the Response. It may also make multiple
+ * forward requests, in series or parallel, or manufacture the response content itself or by calling
+ * an external service.
+ * <p>
+ * Typical usage:
+ * <code>
+ * public class MyProcessor extends Processor {
+ *
+ * &#64;Override
+ * public Response process(Request request, Execution execution) {
+ * // process the request here
+ * Response response = execution.process(request); // Pass along to get the Response
+ * // process (or fill in) Data/DataList items on the response here
+ * return response;
+ * }
+ *
+ * }
+ * </code>
+ *
+ * @author bratseth
+ */
+public abstract class Processor extends ChainedComponent {
+
+ /**
+ * Performs a processing request and returns the response
+ *
+ * @return a Response instance - never null - containing the data produced by this processor
+ * and those it forwards the request to
+ */
+ public abstract Response process(Request request, Execution execution);
+
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/Request.java b/processing/src/main/java/com/yahoo/processing/Request.java
new file mode 100644
index 00000000000..6a8c4211c79
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/Request.java
@@ -0,0 +1,90 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing;
+
+import com.yahoo.component.provider.FreezableClass;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.processing.request.ErrorMessage;
+import com.yahoo.processing.request.Properties;
+import com.yahoo.processing.request.properties.PropertyMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A generic processing request.
+ * The request contains a set of properties that are used to communicate information from the client making the
+ * processing request (e.g http parameters), and as a blackboard to pass information between processors.
+ *
+ * @author bratseth
+ */
+public class Request extends FreezableClass implements Cloneable {
+
+ private Properties properties;
+
+ /**
+ * The errors encountered while processing this request
+ */
+ private List<ErrorMessage> errors = new ArrayList<>(0);
+
+ /**
+ * The name of the chain of Processor instances which will be invoked when
+ * executing a request.
+ */
+ public static final CompoundName CHAIN = new CompoundName("chain");
+
+ /**
+ * The name of the request property used in the processing framework to
+ * store the incoming JDisc request.
+ */
+ public static final CompoundName JDISC_REQUEST = new CompoundName("jdisc.request");
+
+ /**
+ * Creates a request with no properties
+ */
+ public Request() {
+ this(new PropertyMap());
+ }
+
+ /**
+ * Create a request with the given properties.
+ * This Request gains ownership of the given properties and may edit them in the future.
+ *
+ * @param properties the properties owner by this
+ */
+ public Request(Properties properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Returns the properties set on this request.
+ * Processors may add properties to send messages to downstream processors.
+ */
+ public Properties properties() {
+ return properties;
+ }
+
+ /**
+ * Returns the list of errors encountered while processing this request, never null.
+ * This is a live reference to the modifiable list of errors of this.
+ */
+ public List<ErrorMessage> errors() {
+ return errors;
+ }
+
+ /**
+ * Returns a clone of this request.
+ * <p>
+ * The properties are logically deeply cloned such that changes to properties in the clone are independent.
+ * <p>
+ * The errors of the original request <b>are not</b> cloned into the new instance:
+ * It will have an empty list of errors.
+ */
+ @Override
+ public Request clone() {
+ Request clone = (Request) super.clone();
+ clone.properties = properties.clone();
+ clone.errors = new ArrayList<>(0);
+ return clone;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/Response.java b/processing/src/main/java/com/yahoo/processing/Response.java
new file mode 100644
index 00000000000..3805311efba
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/Response.java
@@ -0,0 +1,162 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.concurrent.SystemTimer;
+import com.yahoo.processing.execution.ResponseReceiver;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.processing.request.ErrorMessage;
+import com.yahoo.processing.response.ArrayDataList;
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.DataList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A Response to a Request.
+ * <p>
+ * A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or
+ * further nested data lists.
+ * <p>
+ * Frameworks built on top of processing may subclass this to create a stricter definition of a response.
+ * Processors producing Responses should not create subclasses but should instead
+ * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling
+ * execution.process(request), which will return an empty Response if there are no further processors in the chain.
+ *
+ * @author bratseth
+ */
+public class Response extends ListenableFreezableClass {
+
+ private final static CompoundName freezeListenerKey =new CompoundName("processing.freezeListener");
+
+ private final DataList<?> data;
+
+ /**
+ * Creates a request containing an empty array data list
+ */
+ public Response(Request request) {
+ this(ArrayDataList.create(request));
+ }
+
+ /**
+ * Creates a response containing a list of data
+ */
+ public Response(DataList<?> data) {
+ this.data = data;
+
+ Runnable freezeListener = null;
+ Request request = data.request();
+ if (request != null) // subclasses of DataList may not ensure this
+ freezeListener = (Runnable)request.properties().get(freezeListenerKey);
+ if (freezeListener != null) {
+ if (freezeListener instanceof ResponseReceiver)
+ ((ResponseReceiver)freezeListener).setResponse(this);
+ data.addFreezeListener(freezeListener, MoreExecutors.sameThreadExecutor());
+ }
+ }
+
+ /**
+ * Convenience constructor which adds the given error message to the given request
+ */
+ public Response(Request request, ErrorMessage errorMessage) {
+ this(ArrayDataList.create(request));
+ request.errors().add(errorMessage);
+ }
+
+ /**
+ * Processors which merges another request into this must call this method to notify the response.
+ * This does not modify the data of either response.
+ */
+ public void mergeWith(Response other) {
+ }
+
+ /**
+ * Returns the top level list of data items of this response
+ */
+ public DataList data() {
+ return data;
+ }
+
+ // ------ static utilities ----------------------------------------------------------------------------
+
+ /**
+ * Returns a future in which the given data list and all lists nested within it are completed.
+ * The only use of the returned future is to call a get() method on it to complete the given dataList and
+ * all dataLists nested below it recursively.
+ * <p>
+ * Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called
+ * will not be completed.
+ *
+ * @param rootDataList the list to complete recursively
+ * @return the future in which all data in and below this list is complete, as the given root dataList for convenience
+ */
+ public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) {
+ List<ListenableFuture<DataList<D>>> futures = new ArrayList<>();
+ collectCompletionFutures(rootDataList, futures);
+ return new CompleteAllOnGetFuture<D>(futures);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) {
+ futures.add((ListenableFuture<DataList<D>>) dataList.complete());
+ for (D data : dataList.asList()) {
+ if (data instanceof DataList)
+ collectCompletionFutures((DataList<D>) data, futures);
+ }
+ }
+
+ /**
+ * A future which on get calls get on all its given futures and sets the value returned from the
+ * first given future as its result.
+ */
+ private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> {
+
+ private final List<ListenableFuture<DataList<D>>> futures;
+
+ public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) {
+ this.futures = new ArrayList<>(futures);
+ }
+
+ @Override
+ public DataList<D> get() throws InterruptedException, ExecutionException {
+ DataList<D> result = null;
+ for (ListenableFuture<DataList<D>> future : futures) {
+ if (result == null)
+ result = future.get();
+ else
+ future.get();
+ }
+ set(result);
+ return result;
+ }
+
+ @Override
+ public DataList<D> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ DataList<D> result = null;
+ long timeLeft = unit.toMillis(timeout);
+ long currentCallStart = SystemTimer.INSTANCE.milliTime();
+ for (ListenableFuture<DataList<D>> future : futures) {
+ if (result == null)
+ result = future.get(timeLeft, TimeUnit.MILLISECONDS);
+ else
+ future.get(timeLeft, TimeUnit.MILLISECONDS);
+ long currentCallEnd = SystemTimer.INSTANCE.milliTime();
+ timeLeft -= (currentCallEnd - currentCallStart);
+ if (timeLeft <= 0) break;
+ currentCallStart = currentCallEnd;
+ }
+ set(result);
+ return result;
+ }
+
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java b/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java
new file mode 100644
index 00000000000..7915fbedb0d
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java
@@ -0,0 +1,157 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.request.ErrorMessage;
+import com.yahoo.processing.response.FutureResponse;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Provides asynchronous execution of processing chains. Usage:
+ *
+ * <pre>
+ * Execution execution = new Execution(chain);
+ * AsyncExecution asyncExecution = new AsyncExecution(execution);
+ * Future&lt;Response&gt; future = asyncExecution.process(request)
+ * try {
+ * result = future.get(timeout, TimeUnit.milliseconds);
+ * } catch(TimeoutException e) {
+ * // Handle timeout
+ * }
+ * </pre>
+ *
+ * <p>
+ * The request is not thread safe. A clone() must be made for each parallel processing.
+ * </p>
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ * @see Execution
+ */
+public class AsyncExecution {
+
+ private static final ThreadFactory threadFactory = ThreadFactoryFactory.getThreadFactory("processing");
+
+ private static final Executor executorMain = createExecutor();
+ private static final Executor createExecutor() {
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(100,
+ Integer.MAX_VALUE, 1L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(false), threadFactory);
+ // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also
+ // get the dreaded thread locals initialized even if they will never run.
+ // That counters what we we want to achieve with the Q that will prefer thread locality.
+ executor.prestartAllCoreThreads();
+ return executor;
+ }
+
+ /**
+ * The execution of this
+ */
+ private final Execution execution;
+
+ /**
+ * Create an async execution of a single processor
+ */
+ public AsyncExecution(final Processor processor, Execution parent) {
+ this(new Execution(processor, parent));
+ }
+
+ /**
+ * Create an async execution of a chain
+ */
+ public AsyncExecution(final Chain<? extends Processor> chain, Execution parent) {
+ this(new Execution(chain, parent));
+ }
+
+ /**
+ * Creates an async execution from an existing execution. This async
+ * execution will execute the chain from the given execution, starting
+ * from the next processor in that chain. This is handy to execute
+ * multiple executions to the rest of the chain in parallel.
+ * <p>
+ * The state of the given execution is read on construction of this and not
+ * used later - the argument execution can be reused for other purposes.
+ *
+ * @param execution the execution from which the state of this is created
+ */
+ public AsyncExecution(final Execution execution) {
+ this.execution = new Execution(execution);
+ }
+
+ /**
+ * Performs an async processing. Note that the given request cannot be simultaneously
+ * used in multiple such processings - a clone must be created for each.
+ */
+ public FutureResponse process(final Request request) {
+ return getFutureResponse(new Callable<Response>() {
+ @Override
+ public Response call() {
+ return execution.process(request);
+ }
+ }, request);
+ }
+
+ private static <T> Future<T> getFuture(final Callable<T> callable) {
+ final FutureTask<T> future = new FutureTask<>(callable);
+ executorMain.execute(future);
+ return future;
+ }
+
+ private FutureResponse getFutureResponse(final Callable<Response> callable, final Request request) {
+ final FutureResponse future = new FutureResponse(callable, execution, request);
+ executorMain.execute(future.delegate());
+ return future;
+ }
+
+ /*
+ * Waits for all futures until the given timeout. If a FutureResult isn't
+ * done when the timeout expires, it will be cancelled, and it will return a
+ * response. All unfinished Futures will be cancelled.
+ *
+ * @return the list of responses in the same order as returned from the task collection
+ */
+ // Note that this may also be achieved using guava Futures. Not sure if this should be deprecated because of it.
+ public static List<Response> waitForAll(final Collection<FutureResponse> tasks, final long timeout) {
+
+ // Copy the list in case it is modified while we are waiting
+ final List<FutureResponse> workingTasks = new ArrayList<>(tasks);
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ final Future task = getFuture(new Callable() {
+ @Override
+ public List<Future> call() {
+ for (final FutureResponse task : workingTasks) {
+ task.get();
+ }
+ return null;
+ }
+ });
+
+ try {
+ task.get(timeout, TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException | InterruptedException | ExecutionException e) {
+ // Handle timeouts below
+ }
+
+ final List<Response> responses = new ArrayList<>(tasks.size());
+ for (final FutureResponse future : workingTasks)
+ responses.add(getTaskResponse(future));
+ return responses;
+ }
+
+ private static Response getTaskResponse(FutureResponse future) {
+ if (future.isDone() && !future.isCancelled()) {
+ return future.get(); // Since isDone() = true, this won't block.
+ } else { // Not done and no errors thrown
+ return new Response(future.getRequest(), new ErrorMessage("Timed out waiting for " + future));
+ }
+ }
+
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/Execution.java b/processing/src/main/java/com/yahoo/processing/execution/Execution.java
new file mode 100644
index 00000000000..c8bc291c036
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/Execution.java
@@ -0,0 +1,487 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution;
+
+import com.yahoo.collections.Pair;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.chain.ChainRegistry;
+import com.yahoo.yolean.trace.TraceNode;
+import com.yahoo.yolean.trace.TraceVisitor;
+
+import java.util.Iterator;
+
+/**
+ * An execution of a chain. This keeps tracks of the progress of the execution and is called by the
+ * processors (using {@link #process} to move the execution to the next one.
+ *
+ * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class Execution {
+
+ /**
+ * The index of the searcher in the chain which should be executed on the next call
+ * An Execution instance contains the state of a chain execution by
+ * providing a class stack for a chain - when a processor is called
+ * (through this), it will increment the index of the processor to call next,
+ * each time a processor returns (regardless of how) it will do the opposite.
+ */
+ private int processorIndex;
+
+ private final Chain<? extends Processor> chain;
+
+ private final Trace trace;
+
+ private final Environment<? extends Processor> environment;
+
+ /**
+ * Creates an execution of a single processor
+ *
+ * @param processor the processor to execute in this
+ * @param execution the parent execution of this
+ */
+ public Execution(Processor processor, Execution execution) {
+ this(new Chain<>(processor), execution);
+ }
+
+ /** Creates an execution of a single processor which is not in the context of an existing execution */
+ public static Execution createRoot(Processor processor, int traceLevel, Environment<? extends Processor> environment) {
+ return createRoot(new Chain<>(processor), traceLevel, environment);
+ }
+
+ /**
+ * Creates an execution which is not in the context of an existing execution
+ *
+ * @param chain the chain to execute
+ * @param traceLevel the level to emit trace at
+ * @param environment the execution environment to use
+ */
+ public static Execution createRoot(Chain<? extends Processor> chain, int traceLevel,
+ Environment<? extends Processor> environment) {
+ return new Execution(chain, 0, Trace.createRoot(traceLevel), environment);
+ }
+
+ /**
+ * Creates an execution of a chain
+ *
+ * @param chain the chain to execute in this, starting from the first processor
+ * @param execution the parent execution of this
+ */
+ public Execution(Chain<? extends Processor> chain, Execution execution) {
+ this(chain, 0, execution.trace().createChild(), execution.environment().nested());
+ }
+
+ /**
+ * Creates an execution from another. This execution will start at the next processor of the
+ * given execution. The given execution can continue independently of this.
+ */
+ public Execution(Execution startPoint) {
+ this(startPoint.chain, startPoint.processorIndex, startPoint.trace.createChild(), startPoint.environment().nested());
+ }
+
+ /**
+ * Creates a new execution by setting the internal state directly.
+ *
+ * @param chain the chain to execute
+ * @param startIndex the start index into that chain
+ * @param trace the context <b>of this</b>. If this is created from an existing execution/context,
+ * be sure to do <code> new Context&lt;COMPONENT&gt;(startPoint.context)</code> first!
+ * @param environment the static execution environment to use
+ */
+ protected Execution(Chain<? extends Processor> chain, int startIndex, Trace trace, Environment<? extends Processor> environment) {
+ if (chain == null) throw new NullPointerException("Chain cannot be null");
+ this.chain = chain;
+ this.processorIndex = startIndex;
+ this.trace = trace;
+ this.environment = environment;
+ }
+
+ /**
+ * Calls process on the next processor in this chain. If there is no next, an empty response is returned.
+ */
+ public Response process(Request request) {
+ Processor processor = next();
+ if (processor == null)
+ return defaultResponse(request);
+
+ Response response = null;
+ try {
+ nextProcessor();
+ onInvoking(request, processor);
+ response = processor.process(request, this);
+ if (response == null)
+ throw new NullPointerException(processor + " returned null, not a Response object");
+ return response;
+ } finally {
+ previousProcessor();
+ onReturning(request, processor, response);
+ }
+ }
+
+ /**
+ * Returns the index into the chain of processors which is currently next
+ */
+ protected int nextIndex() {
+ return processorIndex;
+ }
+
+ /**
+ * A hook called when a processor is to be invoked. Overriding methods must call super.onInvoking
+ */
+ protected void onInvoking(Request request, Processor next) {
+ if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) {
+ int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel();
+ trace.trace("Invoke " + next, traceAt);
+ }
+ if (Trace.Level.Dependencies.includes(trace.getTraceLevel()))
+ trace.trace(next.getId() + " " + next.getDependencies().toString(), trace.getTraceLevel());
+ }
+
+ /**
+ * A hook called when a processor returns, either normally or by throwing.
+ * Overriding methods must call super.onReturning
+ *
+ * @param request the processing request
+ * @param processor the processor which returned
+ * @param response the response returned, or null if the processor returned by throwing
+ */
+ protected void onReturning(Request request, Processor processor, Response response) {
+ if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) {
+ int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel();
+ trace.trace("Return " + processor, traceAt);
+ }
+ }
+
+ /** Move this execution to the previous processor */
+ protected void previousProcessor() {
+ processorIndex--;
+ }
+
+ /** Move this execution to the next processor */
+ protected void nextProcessor() {
+ processorIndex++;
+ }
+
+ /** Returns the next searcher to be invoked in this chain, or null if we are at the last */
+ protected Processor next() {
+ if (chain.components().size() <= processorIndex) return null;
+ return chain.components().get(processorIndex);
+ }
+
+ /** Returns the chain this executes */
+ public Chain<? extends Processor> chain() {
+ return chain;
+ }
+
+ /**
+ * Creates the default response to return from this kind of execution when there are no further processors.
+ * If this is overridden, make sure to propagate any freezeListener from this to the returned response
+ * top-level DataList.
+ */
+ protected Response defaultResponse(Request request) {
+ return new Response(request);
+ }
+
+ public String toString() {
+ StringBuilder s = new StringBuilder("Execution(");
+ s.append(chain.getId());
+ s.append(")#").append(hashCode());
+ return s.toString();
+ }
+
+ public Trace trace() {
+ return trace;
+ }
+
+ public Environment<? extends Processor> environment() {
+ return environment;
+ }
+
+ /**
+ * Holds the static execution environment for the duration of an execution
+ */
+ public static class Environment<COMPONENT extends Processor> {
+
+ private final ChainRegistry<COMPONENT> chainRegistry;
+
+ /**
+ * Creates an empty environment. Only useful for some limited testing
+ */
+ public static <C extends Processor> Environment<C> createEmpty() {
+ return new Environment<C>(new ChainRegistry<C>());
+ }
+
+ /**
+ * Returns an environment for an execution spawned from the execution having this environment.
+ */
+ public Environment<COMPONENT> nested() {
+ return this; // this is immutable, subclasses might want to do something else though
+ }
+
+ /**
+ * Creates a new environment
+ */
+ public Environment(ChainRegistry<COMPONENT> chainRegistry) {
+ this.chainRegistry = chainRegistry;
+ }
+
+ /**
+ * Returns the processing chain registry of this execution environment.
+ * The registry may be empty, but never null.
+ */
+ public ChainRegistry<COMPONENT> chainRegistry() {
+ return chainRegistry;
+ }
+
+ }
+
+ /**
+ * Tre trace of this execution. This is a facade into a node in the larger trace tree which captures
+ * the information about all executions caused by some request
+ *
+ * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+ public static class Trace {
+
+ /**
+ * The node in the trace tree capturing this execution
+ */
+ private TraceNode traceNode;
+
+ /**
+ * The highest level of tracing this should record
+ */
+ private int traceLevel;
+
+ /**
+ * If true, do timing logic, even though trace level is low.
+ */
+ private boolean forceTimestamps = false;
+
+ /**
+ * Creates an empty root trace with a given level of tracing
+ */
+ public static Trace createRoot(int traceLevel) {
+ return new Trace(traceLevel, new TraceNode(null, timestamp(traceLevel, false)), false);
+ }
+
+ /**
+ * Creates a trace node below a parent
+ */
+ public Trace createChild() {
+ TraceNode child = new TraceNode(null, timestamp(traceLevel, forceTimestamps));
+ traceNode.add(child);
+ return new Trace(getTraceLevel(), child, forceTimestamps);
+ }
+
+ /**
+ * Creates a new instance by assigning the internal state of this directly
+ */
+ private Trace(int traceLevel, TraceNode traceNode, boolean forceTimestamps) {
+ this.traceLevel = traceLevel;
+ this.traceNode = traceNode;
+ this.forceTimestamps = forceTimestamps;
+ }
+
+ /**
+ * Returns the maximum trace level this will record
+ */
+ public int getTraceLevel() {
+ return traceLevel;
+ }
+
+ /**
+ * Sets the maximum trace level this will record
+ */
+ public void setTraceLevel(int traceLevel) {
+ this.traceLevel = traceLevel;
+ }
+
+ public void setForceTimestamps(boolean forceTimestamps) {
+ this.forceTimestamps = forceTimestamps;
+ }
+
+ public boolean getForceTimestamps() {
+ return forceTimestamps;
+ }
+
+ /**
+ * Adds a trace message to this trace, if this trace has at most the given trace level
+ */
+ public void trace(String message, int traceLevel) {
+ if (this.traceLevel >= traceLevel) {
+ traceNode.add(new TraceNode(message, timestamp(traceLevel, forceTimestamps)));
+ }
+ }
+
+ /**
+ * Adds a key-value which will be logged to the access log of this request.
+ * Multiple values may be set to the same key. A value cannot be removed once set,
+ * but it can be overwritten by adding another value for the same key.
+ */
+ public void logValue(String key, String value) {
+ traceNode.add(new TraceNode(new LogValue(key, value), 0));
+ }
+
+ /**
+ * Returns the values that should be written to the access log set in the entire trace node tree
+ */
+ public Iterator<LogValue> logValueIterator() {
+ return traceNode.root().descendants(LogValue.class).iterator();
+ }
+
+ /**
+ * Visits the entire trace tree
+ *
+ * @return the argument visitor for convenience
+ */
+ public <VISITOR extends TraceVisitor> VISITOR accept(VISITOR visitor) {
+ return traceNode.root().accept(visitor);
+ }
+
+ /**
+ * Adds a property key-value to this trace.
+ * Values are looked up by reverse depth-first search in the trace node tree.
+ *
+ * @param name the name of the property
+ * @param value the value of the property, or null to set this property to null
+ */
+ public void setProperty(String name, Object value) {
+ traceNode.add(new TraceNode(new Pair<>(name, value), 0));
+ }
+
+ /**
+ * Returns a property set anywhere in the trace tree this points to.
+ * Note that even though this call is itself "thread robust", the object values returned
+ * may in some scenarios not be written behind a synchronization barrier, so when accessing
+ * objects which are not inherently thread safe, synchronization should be considered.
+ * <p>
+ * This method have a time complexity which is proportional to
+ * the number of trace nodes in the tree
+ *
+ * @return the value of this property, or null if none
+ */
+ public Object getProperty(String name) {
+ return accept(new PropertyValueVisitor(name)).foundValue();
+ }
+
+ /**
+ * Returns the trace node peer of this
+ */
+ public TraceNode traceNode() {
+ return traceNode;
+ }
+
+ /**
+ * Returns a short string description of this
+ */
+ @Override
+ public String toString() {
+ return "trace: " + traceNode;
+ }
+
+ private static long timestamp(int traceLevel, boolean forceTimestamps) {
+ return (forceTimestamps || Level.Timestamp.includes(traceLevel)) ? System.currentTimeMillis() : 0;
+ }
+
+ /**
+ * Visits all trace nodes to collect the last set value of a particular property in a trace tree
+ */
+ private static class PropertyValueVisitor extends TraceVisitor {
+
+ /**
+ * The name of the property to find
+ */
+ private String name;
+ private Object foundValue = null;
+
+ public PropertyValueVisitor(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public void visit(TraceNode node) {
+ if (node.payload() == null) return;
+ if (!(node.payload() instanceof Pair)) return;
+
+ Pair property = (Pair) node.payload();
+ if (!property.getFirst().equals(name)) return;
+ foundValue = property.getSecond();
+ }
+
+ public Object foundValue() {
+ return foundValue;
+ }
+
+ }
+
+ /**
+ * An immutable access log value added to the trace
+ */
+ public static class LogValue {
+
+ private final String key;
+ private final String value;
+
+ public LogValue(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return key + "=" + value;
+ }
+
+ }
+
+ /**
+ * Defines what information is added at which trace level
+ */
+ public enum Level {
+
+ /**
+ * Every processing step initiated is traced
+ */
+ Step(4),
+ /**
+ * All trace messages are timestamped
+ */
+ Timestamp(6),
+ /**
+ * The before/after dependencies of each processing step is traced on every invocation
+ */
+ Dependencies(7);
+
+ /**
+ * The smallest trace level at which this information will be traced
+ */
+ private int value;
+
+ Level(int value) {
+ this.value = value;
+ }
+
+ public int value() {
+ return value;
+ }
+
+ /**
+ * Returns whether this level includes the given level, i.e whether traceLevel is this.value() or more
+ */
+ public boolean includes(int traceLevel) {
+ return traceLevel >= this.value();
+ }
+
+ }
+ }
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java b/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java
new file mode 100644
index 00000000000..ceb5d4f7ccc
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java
@@ -0,0 +1,36 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+
+/**
+ * An execution which has a response which is returned when this gets to the end of the chain.
+ * This is useful to run processing chains where a response exists up front, typically for on completion listeners.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ExecutionWithResponse extends Execution {
+
+ private Response response;
+
+ /**
+ * Creates an execution which will return a given response at the end of the chain.
+ *
+ * @param chain the chain to execute in this
+ * @param response the response this will return from {@link #process} then the end of this chain is reached
+ * @param execution the the parent of this execution
+ */
+ public ExecutionWithResponse(Chain<? extends Processor> chain, Response response, Execution execution) {
+ super(chain, execution);
+ this.response = response;
+ }
+
+ @Override
+ protected Response defaultResponse(Request request) {
+ return response;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java b/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java
new file mode 100644
index 00000000000..fbc6d0fb6d8
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java
@@ -0,0 +1,17 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution;
+
+import com.yahoo.processing.Response;
+
+/**
+ * An interface for classes which can be given responses.
+ * Freeze listeners may implement this to be handed the response
+ * before they are run. There is probably no other sensible use for this.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public interface ResponseReceiver {
+
+ public void setResponse(Response response);
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java b/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java
new file mode 100644
index 00000000000..617a2b98b03
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution;
+
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+
+/**
+ * An adaptor of an Execution to a runnable. Calling run on this causes process to be called on the
+ * given processor.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class RunnableExecution implements Runnable {
+
+ private final Request request;
+ private final Execution execution;
+ private Response response = null;
+ private Throwable exception = null;
+
+ public RunnableExecution(Request request, Execution execution) {
+ this.request = request;
+ this.execution = execution;
+ }
+
+ /**
+ * Calls process on the execution of this.
+ * This will result in either response or exception being set on this.
+ * Calling this never throws an exception.
+ */
+ public void run() {
+ try {
+ response = execution.process(request);
+ } catch (Exception e) {
+ exception = e; // TODO: Log
+ }
+ }
+
+ /**
+ * Returns the response from executing this, or null if exception is set or run() has not been called yet
+ */
+ public Response getResponse() {
+ return response;
+ }
+
+ /**
+ * Returns the exception from executing this, or null if response is set or run() has not been called yet
+ */
+ public Throwable getException() {
+ return exception;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java b/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java
new file mode 100644
index 00000000000..e31b9a8b098
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution.chain;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.component.chain.ChainedComponent;
+import com.yahoo.component.provider.ComponentRegistry;
+
+/**
+ * A registry of chains
+ *
+ * @author tonytv
+ * @since 5.1.7
+ */
+public class ChainRegistry<T extends ChainedComponent> extends ComponentRegistry<Chain<T>> {
+}
diff --git a/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java b/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java
new file mode 100644
index 00000000000..e2f299fa186
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java
@@ -0,0 +1,6 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing.execution.chain;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/execution/package-info.java b/processing/src/main/java/com/yahoo/processing/execution/package-info.java
new file mode 100644
index 00000000000..c3a3074bfb4
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/execution/package-info.java
@@ -0,0 +1,6 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing.execution;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/package-info.java b/processing/src/main/java/com/yahoo/processing/package-info.java
new file mode 100644
index 00000000000..4f0b2d4e724
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/package-info.java
@@ -0,0 +1,10 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing;
+
+// TODO:
+// - Look through all instances where we pass executor and consider if we should allow the caller to decide the thread
+// - Should data listener use a typed interface rather than runnable`
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/request/CompoundName.java b/processing/src/main/java/com/yahoo/processing/request/CompoundName.java
new file mode 100644
index 00000000000..798d2d11f89
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/CompoundName.java
@@ -0,0 +1,287 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.yahoo.text.Lowercase.toLowerCase;
+
+/**
+ * An immutable compound name of the general form "a.bb.ccc",
+ * where there can be any number of such compounds, including one or zero.
+ * <p>
+ * Using CompoundName is generally substantially faster than using strings.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public final class CompoundName {
+
+ /**
+ * The string name of this compound.
+ */
+ private final String name;
+ private final String lowerCasedName;
+
+ private final ImmutableList<String> compounds;
+
+ /** A hashcode which is always derived from the compounds (NEVER the string) */
+ private final int hashCode;
+
+ /** This name with the first component removed */
+ private final CompoundName rest;
+
+ /** The empty compound */
+ public static final CompoundName empty = new CompoundName("");
+
+ /**
+ * Constructs this from a string which may contains dot-separated components
+ *
+ * @throws NullPointerException if name is null
+ */
+ public CompoundName(final String name) {
+ this(name, parse(name));
+ }
+
+ /** Constructs this from an array of name components which are assumed not to contain dots */
+ public static CompoundName fromComponents(String ... components) {
+ return new CompoundName(Arrays.asList(components));
+ }
+
+ /** Constructs this from a list of compounds. */
+ public CompoundName(List<String> compounds) {
+ this(toCompoundString(compounds), compounds);
+ }
+
+ /**
+ * Constructs this from a name with already parsed compounds.
+ * Private to avoid creating names with inconsistencies.
+ *
+ * @param name the string representation of the compounds
+ * @param compounds the compounds of this name
+ */
+ private CompoundName(String name, List<String> compounds) {
+ if (name == null) throw new NullPointerException("Name can not be null");
+
+ this.name = name;
+ this.lowerCasedName = toLowerCase(name);
+ if (compounds.size()==1 && compounds.get(0).isEmpty())
+ this.compounds = ImmutableList.of();
+ else
+ this.compounds = ImmutableList.copyOf(compounds);
+ this.hashCode = this.compounds.hashCode();
+
+ int size = this.compounds.size();
+ rest = size > 1 ? new CompoundName(compounds.subList(1, size))
+ : size == 1 ? empty : this; // size==0 -> this needed during construction of empty
+ }
+
+ private static List<String> parse(String s) {
+ ArrayList<String> l = new ArrayList<>();
+ int p = 0;
+ final int m = s.length();
+ for (int i = 0; i < m; i++) {
+ if (s.charAt(i) == '.') {
+ l.add(s.substring(p, i));
+ p = i + 1;
+ }
+ }
+ if (p == 0) {
+ l.add(s);
+ } else if (p < m) {
+ l.add(s.substring(p, m));
+ } else {
+ throw new IllegalArgumentException("'" + s + "' is not a legal compound name. Names can not end with a dot.");
+ }
+ return l;
+ }
+
+ /**
+ * Returns a compound name which has the given compound string appended to it
+ *
+ * @param name if name is empty this returns <code>this</code>
+ */
+ public CompoundName append(String name) {
+ if (name.isEmpty()) return this;
+ if (isEmpty()) return new CompoundName(name);
+ List<String> newCompounds = new ArrayList<>(compounds);
+ newCompounds.addAll(parse(name));
+ return new CompoundName(concat(this.name, name), newCompounds);
+ }
+
+ /**
+ * Returns a compound name which has the given compounds appended to it
+ *
+ * @param name if name is empty this returns <code>this</code>
+ */
+ public CompoundName append(CompoundName name) {
+ if (name.isEmpty()) return this;
+ if (isEmpty()) return name;
+ List<String> newCompounds = new ArrayList<>(compounds);
+ newCompounds.addAll(name.compounds);
+ return new CompoundName(concat(this.name, name.name), newCompounds);
+ }
+
+ private String concat(String name1, String name2) {
+ return name1 + "." + name2;
+ }
+
+ /**
+ * Returns a compound name which has the given name components prepended to this name,
+ * in the given order, i.e new ComponentName("c").prepend("a","b") will yield "a.b.c".
+ *
+ * @param nameParts if name is empty this returns <code>this</code>
+ */
+ public CompoundName prepend(String ... nameParts) {
+ if (nameParts.length == 0) return this;
+ if (isEmpty()) return fromComponents(nameParts);
+
+ List<String> newCompounds = new ArrayList<>();
+ for (String namePart : nameParts)
+ newCompounds.add(namePart);
+ newCompounds.addAll(this.compounds);
+ return new CompoundName(newCompounds);
+ }
+
+ /**
+ * Returns the name after the last dot. If there are no dots, the full name is returned.
+ */
+ public String last() {
+ if (compounds.isEmpty()) return "";
+ return compounds.get(compounds.size() - 1);
+ }
+
+ /**
+ * Returns the name before the first dot. If there are no dots the full name is returned.
+ */
+ public String first() {
+ if (compounds.isEmpty()) return "";
+ return compounds.get(0);
+ }
+
+ /**
+ * Returns the first n components of this.
+ *
+ * @throws IllegalArgumentException if this does not have at least n components
+ */
+ public CompoundName first(int n) {
+ if (compounds.size() < n)
+ throw new IllegalArgumentException("Asked for the first " + n + " components but '" +
+ this + "' only have " + compounds.size() + " components.");
+ return new CompoundName(compounds.subList(0, n));
+ }
+
+ /**
+ * Returns the name after the first dot, or "" if this name has no dots
+ */
+ public CompoundName rest() { return rest; }
+
+ /**
+ * Returns the name starting after the n first components (i.e dots).
+ * This may be the empty name.
+ *
+ * @throws IllegalArgumentException if this does not have at least that many components
+ */
+ public CompoundName rest(int n) {
+ if (n == 0) return this;
+ if (compounds.size() < n)
+ throw new IllegalArgumentException("Asked for the rest after " + n + " components but '" +
+ this + "' only have " + compounds.size() + " components.");
+ if (n == 1) return rest();
+ if (compounds.size() == n) return empty;
+ return rest.rest(n-1);
+ }
+
+ /**
+ * Returns the number of compound elements in this. Which is exactly the number of dots in the string plus one.
+ * The size of an empty compound is 0.
+ */
+ public int size() {
+ return compounds.size();
+ }
+
+ /**
+ * Returns the compound element as the given index
+ */
+ public String get(int i) {
+ return compounds.get(i);
+ }
+
+ /**
+ * Returns a compound which have the name component at index i set to the given name.
+ * As an optimization, if the given name == the name component at this index, this is returned.
+ */
+ public CompoundName set(int i, String name) {
+ if (get(i) == name) return this;
+ List<String> newCompounds = new ArrayList<>(compounds);
+ newCompounds.set(i, name);
+ return new CompoundName(newCompounds);
+ }
+
+ /**
+ * Returns whether this name has more than one element
+ */
+ public boolean isCompound() {
+ return compounds.size() > 1;
+ }
+
+ public boolean isEmpty() {
+ return compounds.isEmpty();
+ }
+
+ /**
+ * Returns whether the given name is a prefix of this.
+ * Prefixes are taken on the component, not character level, so
+ * "a" is a prefix of "a.b", but not a prefix of "ax.b
+ */
+ public boolean hasPrefix(CompoundName prefix) {
+ if (prefix.size() > this.size()) return false;
+
+ int prefixLength = prefix.name.length();
+ if (prefixLength == 0)
+ return true;
+
+ if (name.length() > prefixLength && name.charAt(prefixLength) != '.')
+ return false;
+
+ return name.startsWith(prefix.name);
+ }
+
+ /**
+ * Returns an immutable list of the components of this
+ */
+ public List<String> asList() {
+ return compounds;
+ }
+
+ @Override
+ public int hashCode() { return hashCode; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if ( ! (o instanceof CompoundName)) return false;
+ CompoundName other = (CompoundName)o;
+ return this.name.equals(other.name);
+ }
+
+ /**
+ * Returns the string representation of this - all the name components in order separated by dots.
+ */
+ @Override
+ public String toString() { return name; }
+
+ public String getLowerCasedName() {
+ return lowerCasedName;
+ }
+
+ private static String toCompoundString(List<String> compounds) {
+ StringBuilder b = new StringBuilder();
+ for (String compound : compounds)
+ b.append(compound).append(".");
+ return b.length()==0 ? "" : b.substring(0, b.length()-1);
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java b/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java
new file mode 100644
index 00000000000..15ef23b2d62
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java
@@ -0,0 +1,217 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request;
+
+/**
+ * An error encountered while processing a request.
+ * This can be subclassed to add error messages containing more information.
+ * <p>
+ * Error messages are immutable.
+ *
+ * @author bratseth
+ */
+public class ErrorMessage implements Cloneable {
+
+ private final int code;
+ private final String message;
+ private final String detailedMessage;
+ private final Throwable cause;
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ */
+ public ErrorMessage(String message) {
+ this(0, message, null, null);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ * @param code an error code. If this is bound to HTTP request/responses and
+ * this error code is a HTTP status code, this code will be returned as the HTTP status
+ */
+ public ErrorMessage(int code, String message) {
+ this(code, message, null, null);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ * @param details a longer detail description of this condition
+ */
+ public ErrorMessage(String message, String details) {
+ this(0, message, details, null);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ * @param code an error code. If this is bound to HTTP request/responses and
+ * this error code is a HTTP status code, this code will be returned as the HTTP status
+ * @param details a longer detail description of this condition
+ */
+ public ErrorMessage(int code, String message, String details) {
+ this(code, message, details, null);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ * @param cause the cause of this error
+ */
+ public ErrorMessage(String message, Throwable cause) {
+ this(0, message, null, cause);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param code an error code. If this is bound to HTTP request/responses and
+ * this error code is a HTTP status code, this code will be returned as the HTTP status
+ * @param message the textual message describing this condition tersely
+ * @param cause the cause of this error
+ */
+ public ErrorMessage(int code, String message, Throwable cause) {
+ this(code, message, null, cause);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param message the textual message describing this condition tersely
+ * @param details a longer detail description of this condition
+ * @param cause the cause of this error
+ */
+ public ErrorMessage(String message, String details, Throwable cause) {
+ this(0, message, details, cause);
+ }
+
+ /**
+ * Creates an error
+ *
+ * @param code an error code. If this is bound to HTTP request/responses and
+ * this error code is a HTTP status code, this code will be returned as the HTTP status
+ * @param message the textual message describing this condition tersely
+ * @param details a longer detail description of this condition
+ * @param cause the cause of this error
+ */
+ public ErrorMessage(int code, String message, String details, Throwable cause) {
+ if (message == null) throw new NullPointerException("Message cannot be null");
+ this.code = code;
+ this.message = message;
+ this.detailedMessage = details;
+ this.cause = cause;
+ }
+
+ /**
+ * Returns the code of this message, or 0 if no code is set
+ */
+ public int getCode() {
+ return code;
+ }
+
+ /**
+ * Returns the error message, never null
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * Returns detailed information about this error, or null if there is no detailed message
+ */
+ public String getDetailedMessage() {
+ return detailedMessage;
+ }
+
+ /**
+ * Returns the throwable associated with this error, or null if none
+ */
+ public Throwable getCause() {
+ return cause;
+ }
+
+ /**
+ * Returns a formatted message containing the information in this
+ */
+ @Override
+ public String toString() {
+ if (code == 0 && detailedMessage == null && cause == null) return message; // shortcut
+ StringBuilder b = new StringBuilder();
+ if (code != 0)
+ b.append(code).append(": ");
+ b.append(message);
+ if (detailedMessage != null)
+ b.append(": ").append(detailedMessage);
+ if (cause != null)
+ append(cause, b);
+ return b.toString();
+ }
+
+ private void append(Throwable t, StringBuilder b) {
+ String lastMessage = null;
+ String message;
+ for (; t != null; t = t.getCause(), lastMessage = message) {
+ message = getMessage(t);
+ if (message == null) continue;
+ if (lastMessage != null && lastMessage.equals(message)) continue;
+ if (b.length() > 0)
+ b.append(": ");
+ b.append(message);
+ }
+ }
+
+ /**
+ * Returns a useful message from *this* exception, or null if none
+ */
+ private static String getMessage(Throwable t) {
+ String message = t.getMessage();
+ if (t.getCause() == null) {
+ if (message == null) return t.getClass().getSimpleName();
+ } else {
+ if (message == null) return null;
+ if (message.equals(t.getCause().getClass().getName() + ": " + t.getCause().getMessage())) return null;
+ }
+ return message;
+ }
+
+ @Override
+ public int hashCode() {
+ return code * 7 + message.hashCode() + (detailedMessage == null ? 0 : 17 * detailedMessage.hashCode());
+ }
+
+ /**
+ * Two error messages are equal if they have the same code and message.
+ * The cause is ignored in the comparison.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ErrorMessage)) return false;
+
+ ErrorMessage other = (ErrorMessage) o;
+
+ if (this.code != other.code) return false;
+
+ if (!this.message.equals(other.message)) return false;
+
+ if (this.detailedMessage == null) return other.detailedMessage == null;
+ if (other.detailedMessage == null) return false;
+
+ return this.detailedMessage.equals(other.detailedMessage);
+ }
+
+ @Override
+ public ErrorMessage clone() {
+ try {
+ return (ErrorMessage) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Programming error");
+ }
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/request/Properties.java b/processing/src/main/java/com/yahoo/processing/request/Properties.java
new file mode 100644
index 00000000000..2c603b6c7a0
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/Properties.java
@@ -0,0 +1,573 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The properties of a request
+ *
+ * @author bratseth
+ */
+public class Properties implements Cloneable {
+
+ private Properties chained = null;
+
+ /**
+ * Sets the properties chained to this.
+ *
+ * @param chained the properties to chain to this, or null to make this the last in the chain
+ * @return the given chained object to allow setting up a chain by dotting in one statement
+ */
+ public Properties chain(Properties chained) {
+ this.chained = chained;
+ return chained;
+ }
+
+ /**
+ * Returns the properties chained to this, or null if this is the last in the chain
+ */
+ public Properties chained() {
+ return chained;
+ }
+
+ /**
+ * Returns the first instance of the given class in this chain, or null if none
+ */
+ @SuppressWarnings("unchecked")
+ public final <T extends Properties> T getInstance(Class<T> propertyClass) {
+ if (propertyClass.isAssignableFrom(this.getClass())) return (T) this;
+ if (chained == null) return null;
+ return chained.getInstance(propertyClass);
+ }
+
+ /**
+ * Lists all properties of this with no context, by delegating to listProperties("")
+ */
+ public final Map<String, Object> listProperties() {
+ return listProperties(CompoundName.empty);
+ }
+
+ /**
+ * Returns a snapshot of all properties of this - same as listProperties("",context)
+ */
+ public final Map<String, Object> listProperties(Map<String, String> context) {
+ return listProperties(CompoundName.empty, context, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties by calling listProperties(path,null)
+ */
+ public final Map<String, Object> listProperties(CompoundName path) {
+ return listProperties(path, null, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties by calling listProperties(path,null)
+ */
+ public final Map<String, Object> listProperties(String path) {
+ return listProperties(new CompoundName(path), null, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties by calling listProperties(path,null)
+ */
+ public final Map<String, Object> listProperties(CompoundName path, Map<String, String> context) {
+ return listProperties(path, context, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties by calling listProperties(path,null)
+ */
+ public final Map<String, Object> listProperties(String path, Map<String, String> context) {
+ return listProperties(new CompoundName(path), context, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties of this having a given path prefix
+ * <p>
+ * Some sources of properties may not be list-able (e.g those using reflection)
+ * and will not be included in this snapshot.
+ *
+ *
+ * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties
+ * @param context the context used to resolve the properties, or null if none
+ * @param substitution the properties which will be used to do string substitution in the values added to the map
+ */
+ public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) {
+ if (path == null)
+ path = CompoundName.empty;
+ if (chained() == null)
+ return new HashMap<>();
+ else
+ return chained().listProperties(path, context, this);
+ }
+
+ /**
+ * Returns a snapshot of all properties of this having a given path prefix
+ * <p>
+ * Some sources of properties may not be list-able (e.g those using reflection)
+ * and will not be included in this snapshot.
+ *
+ *
+ * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties
+ * @param context the context used to resolve the properties, or null if none
+ * @param substitution the properties which will be used to do string substitution in the values added to the map
+ */
+ public final Map<String, Object> listProperties(String path, Map<String, String> context, Properties substitution) {
+ return listProperties(new CompoundName(path), context, substitution);
+ }
+
+ /**
+ * Gets a named value which (if necessary) is resolved using a property context.
+ *
+ * @param name the name of the property to return
+ * @param context the variant resolution context, or null if none
+ * @param substitution the properties used to substitute in these properties, or null if none
+ */
+ public Object get(CompoundName name, Map<String, String> context, Properties substitution) {
+ if (chained == null) return null;
+ return chained.get(name, context, substitution);
+ }
+
+ /**
+ * Gets a named value which (if necessary) is resolved using a property context
+ *
+ * @param name the name of the property to return
+ * @param context the variant resolution context, or null if none
+ * @param substitution the properties used to substitute in these properties, or null if none
+ */
+ public final Object get(String name, Map<String, String> context, Properties substitution) {
+ return get(new CompoundName(name), context, substitution);
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one by calling get(name,context,this)
+ */
+ public final Object get(CompoundName name, Map<String, String> context) {
+ return get(name, context, this);
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one by calling get(name,context,this)
+ */
+ public final Object get(String name, Map<String, String> context) {
+ return get(new CompoundName(name), context, this);
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one by calling get(name,null,this)
+ */
+ public final Object get(CompoundName name) {
+ return get(name, null, this);
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one by calling get(name,null,this)
+ */
+ public final Object get(String name) {
+ return get(new CompoundName(name), null, this);
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one,
+ * or the default value if no value is set, or if the first value encountered is explicitly set to null.
+ * <p>
+ * This default implementation simply forwards to the chained instance, or returns the default if none
+ *
+ *
+ * @param name the name of the property to return
+ * @param defaultValue the default value returned if the value returned is null
+ */
+ public final Object get(CompoundName name, Object defaultValue) {
+ Object value = get(name);
+ if (value == null) return defaultValue;
+ return value;
+ }
+
+ /**
+ * Gets a named value from the first chained instance which has one,
+ * or the default value if no value is set, or if the first value encountered is explicitly set to null.
+ * <p>
+ * This default implementation simply forwards to the chained instance, or returns the default if none
+ *
+ * @param name the name of the property to return
+ * @param defaultValue the default value returned if the value returned is null
+ */
+ public final Object get(String name, Object defaultValue) {
+ return get(new CompoundName(name), defaultValue);
+ }
+
+ /**
+ * Sets a value to the first chained instance which accepts it.
+ * <p>
+ * This default implementation forwards to the chained instance or throws
+ * a RuntimeException if there is not chained instance.
+ *
+ * @param name the name of the value
+ * @param value the value to set. Setting a name to null explicitly is legal.
+ * @param context the context used to resolve where the values should be set, or null if none
+ * @throws RuntimeException if no instance in the chain accepted this name-value pair
+ */
+ public void set(CompoundName name, Object value, Map<String, String> context) {
+ if (chained == null) throw new RuntimeException("Property '" + name + "->" + value +
+ "' was not accepted in this property chain");
+ chained.set(name, value, context);
+ }
+
+ /**
+ * Sets a value to the first chained instance which accepts it.
+ * <p>
+ * This default implementation forwards to the chained instance or throws
+ * a RuntimeException if there is not chained instance.
+ *
+ * @param name the name of the value
+ * @param value the value to set. Setting a name to null explicitly is legal.
+ * @param context the context used to resolve where the values should be set, or null if none
+ * @throws RuntimeException if no instance in the chain accepted this name-value pair
+ */
+ public final void set(String name, Object value, Map<String, String> context) {
+ set(new CompoundName(name), value, context);
+ }
+
+ /**
+ * Sets a value to the first chained instance which accepts it by calling set(name,value,null).
+ *
+ * @param name the name of the value
+ * @param value the value to set. Setting a name to null explicitly is legal.
+ * @throws RuntimeException if no instance in the chain accepted this name-value pair
+ */
+ public final void set(CompoundName name, Object value) {
+ set(name, value, null);
+ }
+
+ /**
+ * Sets a value to the first chained instance which accepts it by calling set(name,value,null).
+ *
+ * @param name the name of the value
+ * @param value the value to set. Setting a name to null explicitly is legal.
+ * @throws RuntimeException if no instance in the chain accepted this name-value pair
+ */
+ public final void set(String name, Object value) {
+ set(new CompoundName(name), value, Collections.<String,String>emptyMap());
+ }
+
+ /**
+ * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return
+ * the value. Returns false if this property is null.
+ */
+ public final boolean getBoolean(CompoundName name) {
+ return getBoolean(name, false);
+ }
+
+ /**
+ * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return
+ * the value. Returns false if this property is null.
+ */
+ public final boolean getBoolean(String name) {
+ return getBoolean(name, false);
+ }
+
+ /**
+ * Gets a property as a boolean.
+ * This will return true only if the value is either the empty string,
+ * or any Object which has a toString which is case-insensitive equal to "true"
+ *
+ * @param defaultValue the value to return if this property is null
+ */
+ public final boolean getBoolean(CompoundName key, boolean defaultValue) {
+ return asBoolean(get(key), defaultValue);
+ }
+
+ /**
+ * Gets a property as a boolean.
+ * This will return true only if the value is either the empty string,
+ * or any Object which has a toString which is case-insensitive equal to "true"
+ *
+ * @param defaultValue the value to return if this property is null
+ */
+ public final boolean getBoolean(String key, boolean defaultValue) {
+ return asBoolean(get(key), defaultValue);
+ }
+
+ /**
+ * Converts a value to boolean - this will be true only if the value is either the empty string,
+ * or any Object which has a toString which is case-insensitive equal to "true"
+ */
+ protected final boolean asBoolean(Object value, boolean defaultValue) {
+ if (value == null) return defaultValue;
+
+ String s = value.toString();
+ int sz = s.length();
+ switch (sz) {
+ case 0:
+ return true;
+ case 4:
+ return ((s.charAt(0) | 0x20) == 't') &&
+ ((s.charAt(1) | 0x20) == 'r') &&
+ ((s.charAt(2) | 0x20) == 'u') &&
+ ((s.charAt(3) | 0x20) == 'e');
+ }
+ return false;
+ }
+
+ /**
+ * Returns this property as a string
+ *
+ * @return this property as a string, or null if the property is null
+ */
+ public final String getString(CompoundName key) {
+ return getString(key, null);
+ }
+
+ /**
+ * Returns this property as a string
+ *
+ * @return this property as a string, or null if the property is null
+ */
+ public final String getString(String key) {
+ return getString(key, null);
+ }
+
+ /**
+ * Returns this property as a string
+ *
+ * @param key the property key
+ * @param defaultValue the value to return if this property is null
+ * @return this property as a string
+ */
+ public final String getString(CompoundName key, String defaultValue) {
+ return asString(get(key), defaultValue);
+ }
+
+ /**
+ * Returns this property as a string
+ *
+ * @param key the property key
+ * @param defaultValue the value to return if this property is null
+ * @return this property as a string
+ */
+ public final String getString(String key, String defaultValue) {
+ return asString(get(key), defaultValue);
+ }
+
+ protected final String asString(Object value, String defaultValue) {
+ if (value == null) return defaultValue;
+ return value.toString();
+ }
+
+ /**
+ * Returns a property as an Integer
+ *
+ * @return the integer value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but
+ * have a toString which is not parseable as a number
+ */
+ public final Integer getInteger(CompoundName name) {
+ return getInteger(name, null);
+ }
+
+ /**
+ * Returns a property as an Integer
+ *
+ * @return the integer value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but
+ * have a toString which is not parseable as a number
+ */
+ public final Integer getInteger(String name) {
+ return getInteger(name, null);
+ }
+
+ /**
+ * Returns a property as an Integer
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for the name
+ * @throws NumberFormatException if the given parameter does not exist
+ * or does not have a toString parseable as a number
+ */
+ public final Integer getInteger(CompoundName name, Integer defaultValue) {
+ return asInteger(get(name), defaultValue);
+ }
+
+ /**
+ * Returns a property as an Integer
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for the name
+ * @throws NumberFormatException if the given parameter does not exist
+ * or does not have a toString parseable as a number
+ */
+ public final Integer getInteger(String name, Integer defaultValue) {
+ return asInteger(get(name), defaultValue);
+ }
+
+ protected final Integer asInteger(Object value, Integer defaultValue) {
+ try {
+ if (value == null)
+ return defaultValue;
+
+ if (value instanceof Integer)
+ return (Integer) value;
+
+ String stringValue = value.toString();
+ if (stringValue.isEmpty())
+ return defaultValue;
+
+ return new Integer(stringValue);
+ } catch (IllegalArgumentException e) {
+ throw new NumberFormatException("Not a valid integer");
+ }
+ }
+
+ /**
+ * Returns a property as a Long
+ *
+ * @return the long value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Long getLong(CompoundName name) {
+ return getLong(name, null);
+ }
+
+ /**
+ * Returns a property as a Long
+ *
+ * @return the long value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Long getLong(String name) {
+ return getLong(name, null);
+ }
+
+ /**
+ * Returns a property as a Long
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for this name
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Long getLong(CompoundName name, Long defaultValue) {
+ return asLong(get(name), defaultValue);
+ }
+
+ /**
+ * Returns a property as a Long
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for this name
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Long getLong(String name, Long defaultValue) {
+ return asLong(get(name), defaultValue);
+ }
+
+ protected final Long asLong(Object value, Long defaultValue) {
+ try {
+ if (value == null)
+ return defaultValue;
+
+ if (value instanceof Long)
+ return (Long) value;
+
+ String stringValue = value.toString();
+ if (stringValue.isEmpty())
+ return defaultValue;
+
+ return new Long(value.toString());
+ } catch (IllegalArgumentException e) {
+ throw new NumberFormatException("Not a valid long");
+ }
+ }
+
+ /**
+ * Returns a property as a Double
+ *
+ * @return the integer value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Double getDouble(CompoundName name) {
+ return getDouble(name, null);
+ }
+
+ /**
+ * Returns a property as a Double
+ *
+ * @return the integer value of the name, or null if the property is null
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Double getDouble(String name) {
+ return getDouble(name, null);
+ }
+
+ /**
+ * Returns a property as a Double
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for this name
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Double getDouble(CompoundName name, Double defaultValue) {
+ return asDouble(get(name), defaultValue);
+ }
+
+ /**
+ * Returns a property as a Double
+ *
+ * @param name the property name
+ * @param defaultValue the value to return if this property is null
+ * @return the integer value for this name
+ * @throws NumberFormatException if the given parameter exists but have a value which
+ * is not parseable as a number
+ */
+ public final Double getDouble(String name, Double defaultValue) {
+ return asDouble(get(name), defaultValue);
+ }
+
+ protected final Double asDouble(Object value, Double defaultValue) {
+ try {
+ if (value == null)
+ return defaultValue;
+
+ if (value instanceof Double)
+ return (Double) value;
+
+ String stringValue = value.toString();
+ if (stringValue.isEmpty())
+ return defaultValue;
+
+ return new Double(value.toString());
+ } catch (IllegalArgumentException e) {
+ throw new NumberFormatException("Not a valid double");
+ }
+ }
+
+ /**
+ * Clones this instance and recursively all chained instance.
+ * Implementations should call this and clone their own state as appropriate
+ */
+ public Properties clone() {
+ try {
+ Properties clone = (Properties) super.clone();
+ if (chained != null)
+ clone.chained = this.chained.clone();
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Will never happen");
+ }
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/request/package-info.java b/processing/src/main/java/com/yahoo/processing/request/package-info.java
new file mode 100644
index 00000000000..65b467c588c
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/package-info.java
@@ -0,0 +1,6 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing.request;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java b/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java
new file mode 100644
index 00000000000..9a1441196f3
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java
@@ -0,0 +1,146 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.properties;
+
+import com.yahoo.component.provider.FreezableClass;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.processing.request.Properties;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * A HashMap backing of Properties.
+ * <p>
+ * When this is cloned it will deep copy not only the model object map, but also each
+ * clonable member inside the map.
+ * <p>
+ * Subclassing is supported, a hook can be implemented to provide conditional inclusion in the map.
+ * By default - all properties are accepted, so set is never propagated.
+ * <p>
+ * This class is not multithread safe.
+ *
+ * @author bratseth
+ */
+public class PropertyMap extends Properties {
+
+ private static Logger log = Logger.getLogger(PropertyMap.class.getName());
+
+ /**
+ * The properties of this
+ */
+ private Map<CompoundName, Object> properties = new HashMap<>();
+
+ public void set(CompoundName name, Object value, Map<String, String> context) {
+ if (shouldSet(name, value))
+ properties.put(name, value);
+ else
+ super.set(name, value, context);
+ }
+
+ /**
+ * Return true if this value should be set in this map, false if the set should be propagated instead
+ * This default implementation always returns true.
+ */
+ protected boolean shouldSet(CompoundName name, Object value) {
+ return true;
+ }
+
+ public
+ @Override
+ Object get(CompoundName name, Map<String, String> context,
+ com.yahoo.processing.request.Properties substitution) {
+ if (!properties.containsKey(name)) return super.get(name, context, substitution);
+ return properties.get(name);
+ }
+
+ public
+ @Override
+ PropertyMap clone() {
+ PropertyMap clone = (PropertyMap) super.clone();
+ clone.properties = cloneMap(this.properties);
+ return clone;
+ }
+
+ /**
+ * Clones a map by deep cloning each value which is cloneable and shallow copying all other values.
+ */
+ public static Map<CompoundName, Object> cloneMap(Map<CompoundName, Object> map) {
+ Map<CompoundName, Object> cloneMap = new HashMap<>();
+ for (Map.Entry<CompoundName, Object> entry : map.entrySet()) {
+ Object cloneValue = clone(entry.getValue());
+ if (cloneValue == null)
+ cloneValue = entry.getValue(); // Shallow copy objects which does not support cloning
+ cloneMap.put(entry.getKey(), cloneValue);
+ }
+ return cloneMap;
+ }
+
+ /**
+ * Clones this object if it is clonable, and the clone is public. Returns null if not
+ */
+ public static Object clone(Object object) {
+ if (object == null) return null;
+ if (!(object instanceof Cloneable)) return null;
+ if (object instanceof Object[])
+ return arrayClone((Object[]) object);
+ else
+ return objectClone(object);
+ }
+
+ private static Object arrayClone(Object[] object) {
+ Object[] arrayClone = Arrays.copyOf(object, object.length);
+ // deep clone
+ for (int i = 0; i < arrayClone.length; i++) {
+ Object elementClone = clone(arrayClone[i]);
+ if (elementClone != null)
+ arrayClone[i] = elementClone;
+ }
+ return arrayClone;
+ }
+
+ private static Object objectClone(Object object) {
+ // Fastpath for our own commonly used classes
+ if (object instanceof FreezableClass) {
+ // List common superclass of 'com.yahoo.search.result.Hit'
+ return ((FreezableClass) object).clone();
+ }
+ else if (object instanceof PublicCloneable) {
+ return ((PublicCloneable)object).clone();
+ }
+ else if (object instanceof LinkedList) { // TODO: Why? Somebody's infatuation with LinkedList knows no limits
+ return ((LinkedList) object).clone();
+ }
+
+ try {
+ Method cloneMethod = object.getClass().getMethod("clone");
+ return cloneMethod.invoke(object);
+ } catch (NoSuchMethodException e) {
+ log.warning("'" + object + "' is Cloneable, but has no clone method - will use the same instance in all requests");
+ return null;
+ } catch (IllegalAccessException e) {
+ log.warning("'" + object + "' is Cloneable, but clone method cannot be accessed - will use the same instance in all requests");
+ return null;
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("Exception cloning '" + object + "'", e);
+ }
+ }
+
+ @Override
+ public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) {
+ Map<String, Object> map = super.listProperties(path, context, substitution);
+
+ for (Map.Entry<CompoundName, Object> entry : properties.entrySet()) {
+ if ( ! entry.getKey().hasPrefix(path)) continue;
+ CompoundName propertyName = entry.getKey().rest(path.size());
+ if (propertyName.isEmpty()) continue;
+ map.put(propertyName.toString(), entry.getValue());
+ }
+ return map;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java b/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java
new file mode 100644
index 00000000000..f15b1a93334
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.properties;
+
+/**
+ * This interface publicly exposes the clone method.
+ * Classes which are used in request properties may implement this to allow faster cloning of the request.
+ *
+ * @author bratseth
+ * @since 5.66
+ */
+public interface PublicCloneable<T> extends Cloneable {
+
+ public T clone();
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java b/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java
new file mode 100644
index 00000000000..c0b5ccad151
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java
@@ -0,0 +1,6 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing.request.properties;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/response/AbstractData.java b/processing/src/main/java/com/yahoo/processing/response/AbstractData.java
new file mode 100644
index 00000000000..3595315bd62
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/AbstractData.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.processing.Request;
+
+/**
+ * Convenience superclass for implementations of data. This contains no payload.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public abstract class AbstractData extends ListenableFreezableClass implements Data {
+
+ private Request request;
+
+ /**
+ * Creates some data marked with the request that created it
+ */
+ public AbstractData(Request request) {
+ this.request = request;
+ }
+
+ /**
+ * Returns the request that created this data
+ */
+ public Request request() {
+ return request;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java
new file mode 100644
index 00000000000..845ab041d73
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java
@@ -0,0 +1,161 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.yahoo.component.provider.ListenableFreezableClass;
+import com.yahoo.processing.Request;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A convenience superclass for dataList implementations which handles references to the request and to incoming data.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public abstract class AbstractDataList<DATATYPE extends Data> extends ListenableFreezableClass implements DataList<DATATYPE>, Streamed, Ordered {
+
+ private final boolean ordered;
+ private final boolean streamed;
+
+ /**
+ * The request which caused this to be created
+ */
+ private final Request request;
+
+ /**
+ * The recipient of incoming data to this. Never null, but may be a null recipient.
+ */
+ private final IncomingData<DATATYPE> incomingData;
+
+ private final ListenableFuture<DataList<DATATYPE>> completedFuture;
+
+ /**
+ * Creates a simple data list which does not allow late incoming data
+ *
+ * @param request the request which created this data list
+ */
+ protected AbstractDataList(Request request) {
+ // Cannot call the constructor below because this must be given out below
+ this.request = request;
+ this.incomingData = new IncomingData.NullIncomingData<>(this);
+ this.completedFuture = new DrainOnGetFuture<>(this);
+ ordered = true;
+ streamed = true;
+ }
+
+ /**
+ * Creates a simple data list which receives incoming data in the given instance
+ *
+ * @param request the request which created this data list, never null
+ * @param incomingData the recipient of incoming data to this list, never null
+ */
+ protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData) {
+ this(request, incomingData, true, true);
+ }
+
+ /**
+ * Creates a simple data list which receives incoming data in the given instance
+ *
+ * @param request the request which created this data list, never null
+ * @param incomingData the recipient of incoming data to this list, never null
+ */
+ protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) {
+ if (request == null) throw new NullPointerException("Request cannot be null");
+ if (incomingData == null) throw new NullPointerException("incomingData cannot be null");
+
+ this.request = request;
+ this.incomingData = incomingData;
+ this.completedFuture = new DrainOnGetFuture<>(this);
+ this.ordered = ordered;
+ this.streamed = streamed;
+ }
+
+ /**
+ * Returns the request which created this data
+ */
+ public Request request() {
+ return request;
+ }
+
+ /**
+ * Returns the holder of incoming data to this.
+ * This may be used to add, consume, wait for and be notified about incoming data.
+ * If this instance does not support late incoming data, the read methods of the return object behaves
+ * as expected and is synchronization free. The write methods throws an exception.
+ */
+ public IncomingData<DATATYPE> incoming() {
+ return incomingData;
+ }
+
+ public ListenableFuture<DataList<DATATYPE>> complete() {
+ return completedFuture;
+ }
+
+ @Override
+ public boolean isOrdered() { return ordered; }
+
+ @Override
+ public boolean isStreamed() { return streamed; }
+
+ public String toString() {
+ return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]");
+ }
+
+ public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+
+ private final DataList<DATATYPE> owner;
+
+ public DrainOnGetFuture(DataList<DATATYPE> owner) {
+ this.owner = owner;
+ }
+
+ /**
+ * Returns false as this is not cancellable
+ */
+ @Override
+ public boolean cancel(boolean b) {
+ return false;
+ }
+
+ /**
+ * Returns false as this is not cancellable
+ */
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ /**
+ * Wait until all data is available. When this returns all data is available in the returned data list.
+ */
+ @Override
+ public DataList<DATATYPE> get() throws InterruptedException, ExecutionException {
+ return drain(owner.incoming().completed().get());
+ }
+
+ /**
+ * Wait until all data is available.
+ * When and if this returns normally all data is available in the returned data list
+ */
+ @Override
+ public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
+ return drain(owner.incoming().completed().get(timeout, timeUnit));
+ }
+
+ private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) {
+ for (DATATYPE item : dataList.incoming().drain())
+ dataList.add(item);
+ set(dataList); // Signal completion to listeners
+ return dataList;
+ }
+
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java b/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java
new file mode 100644
index 00000000000..d30d1848c7f
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java
@@ -0,0 +1,130 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.yahoo.collections.FreezableArrayList;
+import com.yahoo.processing.Request;
+
+import java.util.List;
+
+/**
+ * A data list backed by an array.
+ * This implementation supports subclassing.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ArrayDataList<DATATYPE extends Data> extends AbstractDataList<DATATYPE> {
+
+ private final FreezableArrayList<DATATYPE> dataList = new FreezableArrayList<>(true);
+
+ /**
+ * Creates a simple data list which does not allow late incoming data
+ *
+ * @param request the request which created this data list
+ */
+ protected ArrayDataList(Request request) {
+ super(request);
+ }
+
+ /**
+ * Creates a simple data list which receives incoming data in the given instance
+ *
+ * @param request the request which created this data list, never null
+ * @param incomingData the recipient of incoming data to this list, never null
+ */
+ protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData) {
+ this(request, incomingData, true, true);
+ }
+
+ /**
+ * Creates a simple data list which receives incoming data in the given instance
+ *
+ * @param request the request which created this data list, never null
+ * @param incomingData the recipient of incoming data to this list, never null
+ */
+ protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) {
+ super(request, incomingData, ordered, streamed);
+ }
+
+ /**
+ * Creates a simple data list which does not allow late incoming data
+ *
+ * @param request the request which created this data list
+ */
+ public static <DATATYPE extends Data> ArrayDataList<DATATYPE> create(Request request) {
+ return new ArrayDataList<>(request);
+ }
+
+ /**
+ * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData)
+ */
+ public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsync(Request request) {
+ DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>();
+ ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData);
+ incomingData.assignOwner(dataList);
+ return dataList;
+ }
+
+ /**
+ * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData),
+ * and where this data can be rendered in any order.
+ */
+ public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncUnordered(Request request) {
+ DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>();
+ ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, false, true);
+ incomingData.assignOwner(dataList);
+ return dataList;
+ }
+
+ /**
+ * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData)
+ * and where this data cannot be returned to clients until this is completed.
+ */
+ public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncNonstreamed(Request request) {
+ DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>();
+ ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, true, false);
+ incomingData.assignOwner(dataList);
+ return dataList;
+ }
+
+ public DATATYPE add(DATATYPE data) {
+ dataList.add(data);
+ return data;
+ }
+
+ /**
+ * Returns the data element at index
+ */
+ public DATATYPE get(int index) {
+ return dataList.get(index);
+ }
+
+ /**
+ * Returns a reference to the list backing this. The list may be modified freely,
+ * unless this is frozen. If frozen, the only permissible write operations are those that
+ * add new items to the end of the list.
+ */
+ public List<DATATYPE> asList() {
+ return dataList;
+ }
+
+ @Override
+ public void addDataListener(Runnable runnable) {
+ dataList.addListener(runnable);
+ }
+
+ /**
+ * Irreversibly prevent further changes to the items of this.
+ * This allows the processing engine to start streaming the current content of this list back to the
+ * client (if applicable).
+ * <p>
+ * Adding new items to the end of this list is permitted even after freeze.
+ * If frozen, those items may be streamed back to the client immediately on add.
+ * <p>
+ * Calling this on a frozen list has no effect.
+ */
+ public void freeze() {
+ super.freeze();
+ dataList.freeze();
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/Data.java b/processing/src/main/java/com/yahoo/processing/response/Data.java
new file mode 100644
index 00000000000..31207210ef2
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/Data.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.yahoo.component.provider.ListenableFreezable;
+import com.yahoo.processing.Request;
+
+/**
+ * A data item created due to a processing request.
+ * <p>
+ * If a data item is <i>frozen</i> it is illegal to make further changes to its payload or referenced request.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+// TODO: Have DataList implement this instead, probably (should be a safe change in practise)
+public interface Data extends ListenableFreezable {
+
+ /**
+ * Returns the request that created this data
+ */
+ public Request request();
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/DataList.java b/processing/src/main/java/com/yahoo/processing/response/DataList.java
new file mode 100644
index 00000000000..42cb1f98a70
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/DataList.java
@@ -0,0 +1,85 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * A list of data items created due to a processing request.
+ * This list is itself a data item, allowing data items to be organized into a composite tree.
+ * <p>
+ * A data list can be frozen even though its child data items are not.
+ * When a datalist is frozen the only permissible write operation is to add new items
+ * to the end of the list.
+ * <p>
+ * Content in a frozen list may be returned to the requesting client immediately by the underlying engine,
+ * even if the Response owning the list is not returned yet.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public interface DataList<DATATYPE extends Data> extends Data {
+
+ /**
+ * Adds a child data item to this.
+ *
+ * @param data the data to add to this
+ * @return the input data instance, for chaining
+ */
+ public DATATYPE add(DATATYPE data);
+
+ public DATATYPE get(int index);
+
+ /**
+ * Returns the content of this as a List.
+ * The returned list is either a read-only snapshot or an editable reference to the content of this.
+ * If the returned list is editable and this is frozen, the only allowed operation is to add new items
+ * to the end of the list.
+ */
+ public List<DATATYPE> asList();
+
+ /**
+ * Returns the buffer of incoming/future data to this list.
+ * This can be used to provide data to this list from other threads, after its creation,
+ * and to consume, wait for or be notified upon the arrival of such data.
+ * <p>
+ * Some list instances do not support late incoming data,
+ * such lists responds to <i>read</i> calls to IncomingData as expected and without
+ * incurring any synchronization, and throws an exception on <i>write</i> calls.
+ */
+ public IncomingData<DATATYPE> incoming();
+
+ /**
+ * Returns a future in which all incoming data in this has become available.
+ * This has two uses:
+ * <ul>
+ * <li>Calling {@link #get} on this future will block (if necessary) until all incoming data has arrived,
+ * transfer that data from the incoming buffer into this list and invoke any listeners on this event
+ * on the calling thread.
+ * <li>Adding a listener on this future will cause it to be called when completion().get() is called, <i>after</i>
+ * the incoming data has been moved to this thread and <i>before</i> the get() call returns.
+ * </ul>
+ * <p>
+ * Note that if no thread calls completed().get(), this future will never occur.
+ * <p>
+ * Any data list consumer who wishes to make sure it sees the complete data for this list
+ * <i>must</i> call <code>dataList.complete().get()</code> before consuming this list.
+ * If a guaranteed non-blocking call to this method is desired, register a listener on the future where all
+ * data is available for draining (that is, on <code>dataList.incoming().completed()</code>)
+ * and resume by calling this method from the listener.
+ * <p>
+ * Making this call on a list which does not support future data always returns immediately and
+ * causes no memory synchronization cost.
+ */
+ public ListenableFuture<DataList<DATATYPE>> complete();
+
+ /**
+ * Adds a listener which is invoked every time data is added to this list.
+ * The listener is always invoked on the same thread which is adding the data,
+ * and hence it can modify this list freely without synchronization.
+ */
+ public void addDataListener(Runnable runnable);
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
new file mode 100644
index 00000000000..212644a186f
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java
@@ -0,0 +1,150 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.google.common.util.concurrent.ExecutionList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.collections.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+/**
+ * The default incoming data implementation
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> {
+
+ private DataList<DATATYPE> owner = null;
+
+ private final SettableFuture<DataList<DATATYPE>> completionFuture;
+
+ private final List<DATATYPE> dataList = new ArrayList<>();
+
+ private List<Tuple2<Runnable,Executor>> newDataListeners = null;
+
+ /**
+ * If this is completed no more data can be added
+ */
+ private boolean complete = false;
+
+ /**
+ * Creates an instance which must be assigned an owner after creation
+ */
+ public DefaultIncomingData() {
+ this(null);
+ }
+
+ public DefaultIncomingData(DataList<DATATYPE> owner) {
+ assignOwner(owner);
+ completionFuture = SettableFuture.create();
+ }
+
+ /**
+ * Assigns the owner of this. Throws an exception if the owner is already set.
+ */
+ public final void assignOwner(DataList<DATATYPE> owner) {
+ if (this.owner != null) throw new NullPointerException("Owner of " + this + " was already assigned");
+ this.owner = owner;
+ }
+
+ @Override
+ public DataList<DATATYPE> getOwner() {
+ return owner;
+ }
+
+ @Override
+ public ListenableFuture<DataList<DATATYPE>> completed() {
+ return completionFuture;
+ }
+
+ /**
+ * Returns whether the data in this is complete
+ */
+ @Override
+ public synchronized boolean isComplete() {
+ return complete;
+ }
+
+ /**
+ * Add new data and mark this as completed
+ */
+ @Override
+ public synchronized void addLast(DATATYPE data) {
+ addLast(Collections.singletonList(data));
+ }
+
+ /**
+ * Add new data without completing this
+ */
+ @Override
+ public synchronized void add(DATATYPE data) {
+ add(Collections.singletonList(data));
+ }
+
+ /**
+ * Add new data and mark this as completed
+ */
+ @Override
+ public synchronized void addLast(List<DATATYPE> data) {
+ add(data);
+ markComplete();
+ }
+
+ /**
+ * Add new data without completing this
+ */
+ @Override
+ public synchronized void add(List<DATATYPE> data) {
+ if (complete) throw new IllegalStateException("Attempted to add data to completed " + this);
+
+ dataList.addAll(data);
+ notifyDataListeners();
+ }
+
+ /**
+ * Mark this as completed and notify any listeners
+ */
+ @Override
+ public synchronized void markComplete() {
+ complete = true;
+ completionFuture.set(owner);
+ }
+
+ /**
+ * Get and remove all the data currently available in this.
+ * The returned list is a modifiable fresh instance owned by the caller.
+ */
+ public synchronized List<DATATYPE> drain() {
+ List<DATATYPE> dataListCopy = new ArrayList<>(dataList);
+ dataList.clear();
+ return dataListCopy;
+ }
+
+ @Override
+ public void addNewDataListener(Runnable listener, Executor executor) {
+ synchronized (this) {
+ if (newDataListeners == null)
+ newDataListeners = new ArrayList<>();
+ newDataListeners.add(new Tuple2<>(listener, executor));
+ if (dataList.isEmpty()) return;
+ }
+ notifyDataListeners();
+ }
+
+ private void notifyDataListeners() {
+ if (newDataListeners == null) return;
+ for (Tuple2<Runnable, Executor> listener : newDataListeners) {
+ listener.second.execute(listener.first);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "incoming: " + (complete ? "complete" : "incomplete") + ", data " + dataList;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java b/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java
new file mode 100644
index 00000000000..7653b8e6c28
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java
@@ -0,0 +1,82 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.google.common.util.concurrent.ForwardingFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.request.ErrorMessage;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A processing response which will arrive in the future.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class FutureResponse extends ForwardingFuture<Response> {
+
+ private final Request request;
+
+ /**
+ * Only used for generating messages
+ */
+ private final Execution execution;
+
+ private final static Logger log = Logger.getLogger(FutureResponse.class.getName());
+
+ private final ListenableFutureTask<Response> futureTask;
+
+ public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) {
+ this.futureTask = ListenableFutureTask.create(callable);
+ this.request = request;
+ this.execution = execution;
+ }
+
+ @Override
+ public ListenableFutureTask<Response> delegate() {
+ return futureTask;
+ }
+
+ public
+ @Override
+ Response get() {
+ try {
+ return super.get();
+ } catch (InterruptedException e) {
+ return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e);
+ return new Response(request, new ErrorMessage("Error in '" + execution + "'", e));
+ }
+ }
+
+ public
+ @Override
+ Response get(long timeout, TimeUnit timeunit) {
+ try {
+ return super.get(timeout, timeunit);
+ } catch (InterruptedException e) {
+ return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e));
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e);
+ return new Response(request, new ErrorMessage("Error in '" + execution + "'", e));
+ } catch (TimeoutException e) {
+ return new Response(request, new ErrorMessage("Error executing '" + execution + "': " + " Chain timed out."));
+ }
+ }
+
+ /**
+ * Returns the query used in this execution, never null
+ */
+ public Request getRequest() {
+ return request;
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/IncomingData.java b/processing/src/main/java/com/yahoo/processing/response/IncomingData.java
new file mode 100644
index 00000000000..5939f56a8f3
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/IncomingData.java
@@ -0,0 +1,219 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A data list own once instance of this which can be used to provide data asynchronously to the list,
+ * and consume, wait for or be notified upon the arrival of such data.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public interface IncomingData<DATATYPE extends Data> {
+
+ /**
+ * Returns the owner (target DataList) of this.
+ * Note that accessing the owner from the thread producing incoming data
+ * is generally *not* thread safe.
+ */
+ public DataList<DATATYPE> getOwner();
+
+ /**
+ * Returns a future in which all the incoming data that will be produced in this is available.
+ * Listeners on this are invoked on the thread producing the incoming data (or a thread spawned from it),
+ * which in general is separate from the thread using the data list. Hence, listeners on this even cannot
+ * in general assume that they may modify the data list or the request.
+ * <p>
+ * The data is not {@link #drain drained} into the owner of this by this method. That must be done
+ * by the thread using the data list.
+ * <p>
+ * This return the list owning this for convenience.
+ */
+ public abstract ListenableFuture<DataList<DATATYPE>> completed();
+
+ /**
+ * Returns whether this is complete
+ */
+ public boolean isComplete();
+
+ /**
+ * Add new data and mark this as completed
+ *
+ * @throws IllegalStateException if this is already complete or does not allow writes
+ */
+ public void addLast(DATATYPE data);
+
+ /**
+ * Add new data without completing this
+ *
+ * @throws IllegalStateException if this is already complete or does not allow writes
+ */
+ public void add(DATATYPE data);
+
+ /**
+ * Add new data and mark this as completed
+ *
+ * @throws IllegalStateException if this is already complete or does not allow writes
+ */
+ public void addLast(List<DATATYPE> data);
+
+ /**
+ * Add new data without completing this.
+ *
+ * @throws IllegalStateException if this is already complete or does not allow writes
+ */
+ public void add(List<DATATYPE> data);
+
+ /**
+ * Mark this as completed and notify any listeners. If this is already complete this method does nothing.
+ */
+ public void markComplete();
+
+ /**
+ * Get and remove all the data currently available in this
+ */
+ public List<DATATYPE> drain();
+
+ /**
+ * Add a listener which will be invoked every time new data is added to this.
+ * This listener may be invoked at any time in any thread, any thread synchronization is left
+ * to the listener itself
+ */
+ public void addNewDataListener(Runnable listener, Executor executor);
+
+ /**
+ * Creates a null implementation of this which is empty and complete at creation:
+ * <ul>
+ * <li>Provides immediate return without incurring any memory synchronization for
+ * any read method.
+ * <li>Throws an exception on any write method
+ * </ul>
+ * <p>
+ * This allows consumers to check for completion the same way whether or not the data list in question
+ * supports asynchronous addition of data, and without incurring unnecessary costs.
+ */
+ public static final class NullIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> {
+
+ private DataList<DATATYPE> owner;
+ private final ImmediateFuture<DATATYPE> completionFuture;
+
+ public NullIncomingData(DataList<DATATYPE> owner) {
+ this.owner = owner;
+ completionFuture = new ImmediateFuture<>(owner);
+ }
+
+ public ListenableFuture<DataList<DATATYPE>> completed() {
+ return completionFuture;
+ }
+
+ @Override
+ public DataList<DATATYPE> getOwner() {
+ return owner;
+ }
+
+ /**
+ * Returns true
+ */
+ @Override
+ public boolean isComplete() {
+ return true;
+ }
+
+ /**
+ * @throws IllegalStateException as this is read only
+ */
+ public void addLast(DATATYPE data) {
+ throw new IllegalStateException(owner + " does not support adding data asynchronously");
+ }
+
+ /**
+ * @throws IllegalStateException as this is read only
+ */
+ public void add(DATATYPE data) {
+ throw new IllegalStateException(owner + " does not support adding data asynchronously");
+ }
+
+ /**
+ * @throws IllegalStateException as this is read only
+ */
+ public void addLast(List<DATATYPE> data) {
+ throw new IllegalStateException(owner + " does not support adding data asynchronously");
+ }
+
+ /**
+ * @throws IllegalStateException as this is read only
+ */
+ public void add(List<DATATYPE> data) {
+ throw new IllegalStateException(owner + " does not support adding data asynchronously");
+ }
+
+ /**
+ * Do nothing as this is already complete
+ */
+ public void markComplete() {
+ }
+
+ public List<DATATYPE> drain() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Adds a new data listener to this - this is a no-op
+ * as new data can never be added to this implementation.
+ */
+ public void addNewDataListener(Runnable listener, Executor executor) { }
+
+ public String toString() {
+ return "(no incoming)";
+ }
+
+ /**
+ * A future which is always done and incurs no synchronization.
+ * This is semantically the same as Futures.immediateFuture but contrary to it,
+ * this never causes any memory synchronization when accessed.
+ */
+ public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> {
+
+ private DataList<DATATYPE> owner;
+
+ public ImmediateFuture(DataList<DATATYPE> owner) {
+ this.owner = owner; // keep here to avoid memory synchronization for access
+ set(owner); // Signal completion (for future listeners)
+ }
+
+ @Override
+ public boolean cancel(boolean b) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return true;
+ }
+
+ @Override
+ public DataList<DATATYPE> get() {
+ return owner;
+ }
+
+ @Override
+ public DataList<DATATYPE> get(long l, TimeUnit timeUnit) {
+ return owner;
+ }
+
+ }
+
+ }
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/Ordered.java b/processing/src/main/java/com/yahoo/processing/response/Ordered.java
new file mode 100644
index 00000000000..10aeaaeb952
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/Ordered.java
@@ -0,0 +1,18 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+/**
+ * This is an <i>optional marker interface</i>.
+ * DataLists may implement this to return false to indicate that the order of the elements of
+ * the list is insignificant. The usage of this is to allow the content of a list to be rendered in the order
+ * in which it completes rather than in the order in which it is added to the list.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ * @since 5.1.19
+ */
+public interface Ordered {
+
+ /** Returns false if the data in this list can be returned in any order. Default: true, meaning the order matters */
+ public boolean isOrdered();
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/Streamed.java b/processing/src/main/java/com/yahoo/processing/response/Streamed.java
new file mode 100644
index 00000000000..6eab5a3287f
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/Streamed.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.response;
+
+/**
+ * This is an <i>optional marker interface</i>.
+ * DataLists may implement this to return false to indicate that no data from the list should be returned to clients
+ * until it is completed. This is useful in cases where some decision making which may impact the content of the list
+ * must be deferred until the list is complete.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ * @since 5.1.19
+ */
+public interface Streamed {
+
+ /**
+ * Returns false if the data in this list can not be returned until it is completed.
+ * Default: true, meaning eager streaming of the data is permissible.
+ */
+ public boolean isStreamed();
+
+}
diff --git a/processing/src/main/java/com/yahoo/processing/response/package-info.java b/processing/src/main/java/com/yahoo/processing/response/package-info.java
new file mode 100644
index 00000000000..649a719c7d3
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/response/package-info.java
@@ -0,0 +1,6 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi package com.yahoo.processing.response;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
new file mode 100644
index 00000000000..fe1e0d75ee3
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java
@@ -0,0 +1,554 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.AsyncExecution;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.execution.ExecutionWithResponse;
+import com.yahoo.processing.execution.RunnableExecution;
+import com.yahoo.processing.request.ErrorMessage;
+import com.yahoo.processing.response.*;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A collection of processors for test purposes.
+ *
+ * @author bratseth
+ */
+public class ProcessorLibrary {
+
+ private ProcessorLibrary() {
+ }
+
+ // ---------------------------------------- Data types
+
+ public static class StringData extends AbstractData {
+
+ private String string;
+
+ public StringData(Request request, String string) {
+ super(request);
+ this.string = string;
+ }
+
+ public void setString(String string) {
+ this.string = string;
+ }
+
+ @Override
+ public String toString() {
+ return string;
+ }
+
+ }
+
+ public static class MapData extends AbstractData {
+
+ private Map map = new LinkedHashMap();
+
+ public MapData(Request request) {
+ super(request);
+ }
+
+ public Map map() { return map; }
+
+ @Override
+ public String toString() {
+ return "map data: " + map;
+ }
+
+ }
+
+ // ---------------------------------------- DataLists
+
+ public static class UnorderedArrayDataList extends ArrayDataList implements Ordered {
+
+ public UnorderedArrayDataList(Request request) {
+ super(request);
+ }
+
+ @Override
+ public boolean isOrdered() {return false; }
+
+ }
+
+ // ---------------------------------------- Processors
+
+ /**
+ * Makes some modifications to the request, passes it on and finally removes one data item from the response
+ */
+ public static class CombineData extends Processor {
+
+ public Response process(Request request, Execution execution) {
+ request.properties().set("appendage", request.properties().getInteger("appendage") + 1);
+ Response response = execution.process(request);
+
+ // Modify the response
+ StringData first = (StringData) response.data().get(0);
+ StringData third = (StringData) response.data().get(2);
+ first.setString(first.toString() + ", " + third.toString());
+ response.data().asList().remove(2);
+ return response;
+ }
+
+ }
+
+ /**
+ * Sends the request multiple times to get at least 6 pieces of data
+ */
+ public static class Get6DataItems extends Processor {
+
+ @SuppressWarnings("unchecked")
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ while (response.data().asList().size() < 6) {
+ request.properties().set("appendage", request.properties().getInteger("appendage") + 1);
+ Response additional = execution.process(request);
+ response.mergeWith(additional);
+ response.data().asList().addAll(additional.data().asList());
+ }
+ return response;
+ }
+
+ }
+
+ /**
+ * Produces 3 pieces of string data
+ */
+ public static class DataSource extends Processor {
+
+ @SuppressWarnings("unchecked")
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ response.data().add(new StringData(request, "first." + request.properties().get("appendage")));
+ response.data().add(new StringData(request, "second." + request.properties().get("appendage")));
+ response.data().add(new StringData(request, "third." + request.properties().get("appendage")));
+ return response;
+ }
+
+ }
+
+ public static class Federator extends Processor {
+
+ private final List<Chain<? extends Processor>> chains;
+
+ private final boolean ordered;
+
+ /**
+ * Federates over the given chains. Returns an ordered response.
+ */
+ @SafeVarargs
+ public Federator(Chain<? extends Processor>... chains) {
+ this(true, chains);
+ }
+
+ /**
+ * Federates over the given chains
+ *
+ * @param ordered true if the returned list should be ordered (default), false if it should be permissible
+ * to render the datalist from each federated source in the order it completes.
+ */
+ @SafeVarargs
+ public Federator(boolean ordered, Chain<? extends Processor>... chains) {
+ this.chains = Arrays.asList(chains);
+ this.ordered = ordered;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request));
+
+ List<FutureResponse> futureResponses = new ArrayList<>(chains.size());
+ for (Chain<? extends Processor> chain : chains) {
+
+ futureResponses.add(new AsyncExecution(chain, execution).process(request.clone()));
+ }
+ AsyncExecution.waitForAll(futureResponses, 1000);
+ for (FutureResponse futureResponse : futureResponses) {
+ Response federatedResponse = futureResponse.get();
+ response.data().add(federatedResponse.data());
+ response.mergeWith(federatedResponse);
+ }
+ return response;
+ }
+ }
+
+ /**
+ * A federator which supports returning frozen data from each chain before the response is returned.
+ */
+ public static class EagerReturnFederator extends Processor {
+
+ private final List<Chain<? extends Processor>> chains;
+
+ private final boolean ordered;
+
+ /**
+ * Federates over the given chains. Returns an ordered response.
+ */
+ @SafeVarargs
+ public EagerReturnFederator(Chain<? extends Processor>... chains) {
+ this(true, chains);
+ }
+
+ /**
+ * Federates over the given chains
+ *
+ * @param ordered true if the returned list should be ordered (default), false if it should be permissible
+ * to render the datalist from each federated source in the order it completes.
+ */
+ @SafeVarargs
+ public EagerReturnFederator(boolean ordered, Chain<? extends Processor>... chains) {
+ this.chains = Arrays.asList(chains);
+ this.ordered = ordered;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ List<FutureResponse> futureResponses = new ArrayList<>(chains.size());
+ for (Chain<? extends Processor> chain : chains) {
+ futureResponses.add(new AsyncExecution(chain, execution).process(request.clone()));
+ }
+ AsyncExecution.waitForAll(futureResponses, 1000);
+ Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request));
+ for (FutureResponse futureResponse : futureResponses) {
+ Response federatedResponse = futureResponse.get();
+ response.data().add(federatedResponse.data());
+ response.mergeWith(federatedResponse);
+ }
+ return response;
+ }
+ }
+
+ /**
+ * Adds a data element containing the (recursive) count of concrete (non-list) data elements in the response
+ */
+ public static class DataCounter extends Processor {
+
+ private String prefix = "";
+
+ public DataCounter() {
+ }
+
+ /**
+ * The prefix "[name] " is prepended to the string data
+ */
+ public DataCounter(String name) {
+ prefix = "[" + name + "] ";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ int dataCount = countData(response.data());
+ response.data().add(new StringData(request, prefix + "Data count: " + dataCount));
+ return response;
+ }
+
+ private int countData(DataList<? extends Data> dataList) {
+ int count = 0;
+ for (Data data : dataList.asList()) {
+ if (data instanceof DataList)
+ count += countData((DataList<?>) data);
+ else
+ count++;
+ }
+ return count;
+ }
+ }
+
+ // TODO: Replace by below?
+ public static class FutureDataSource extends Processor {
+
+ /** The list of incoming data this has created */
+ public final List<IncomingData> incomingData = new ArrayList<>();
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ ArrayDataList dataList = ArrayDataList.createAsync(request);
+ incomingData.add(dataList.incoming());
+ return new Response(dataList);
+ }
+
+ }
+
+ /** Allows waiting for that request to happen. */
+ public static class ListenableFutureDataSource extends Processor {
+
+ private final boolean ordered, streamed;
+
+ /** The incoming data this has created */
+ public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+
+ /** Create an instance which returns ordered, streamable data */
+ public ListenableFutureDataSource() { this(true, true); }
+
+ public ListenableFutureDataSource(boolean ordered, boolean streamed) {
+ this.ordered = ordered;
+ this.streamed = streamed;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ ArrayDataList dataList;
+ if (! ordered)
+ dataList = ArrayDataList.createAsyncUnordered(request);
+ else if (! streamed)
+ dataList = ArrayDataList.createAsyncNonstreamed(request);
+ else
+ dataList = ArrayDataList.createAsync(request);
+ incomingData.set(dataList.incoming());
+ return new Response(dataList);
+ }
+
+ }
+
+ /** Allows waiting for that request to happen. */
+ public static class RequestCounter extends Processor {
+
+ /** The incoming data this has created */
+ public final SettableFuture<IncomingData> incomingData = SettableFuture.create();
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ ArrayDataList dataList = ArrayDataList.createAsync(request);
+ incomingData.set(dataList.incoming());
+ return new Response(dataList);
+ }
+
+ }
+
+ /**
+ * Multiples the amount of data returned by parallelism by performing parallel executions of the rest of the chain
+ */
+ public static class BlockingSplitter extends Processor {
+
+ private final int parallelism;
+
+ public BlockingSplitter(int parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ try {
+ // start executions in other threads
+ List<FutureResponse> futures = new ArrayList<>(parallelism - 1);
+ for (int i = 1; i < parallelism; i++) {
+ futures.add(new AsyncExecution(execution).process(request.clone()));
+ }
+
+ // complete this execution
+ Response response = execution.process(request);
+
+ // wait for other executions and merge the responses
+ for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) {
+ additionalResponse.data().complete().get(); // block until we have all the data elements
+ for (Object item : additionalResponse.data().asList())
+ response.data().add((Data) item);
+ response.mergeWith(additionalResponse);
+ }
+ return response;
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ /**
+ * Registers an async processing of the chain given in the constructor on completion of the data in the response
+ */
+ public static class AsyncDataProcessingInitiator extends Processor {
+
+ private final Chain<Processor> asyncChain;
+
+ public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) {
+ this.asyncChain = asyncChain;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ // TODO: Consider for to best provide helpers for this
+ response.data().complete().addListener(new RunnableExecution(request,
+ new ExecutionWithResponse(asyncChain, response, execution)),
+ MoreExecutors.sameThreadExecutor());
+ return response;
+ }
+
+ }
+
+ /**
+ * Registers a chain to be invoked each time new data becomes available in the first child list
+ */
+ public static class StreamProcessingInitiator extends Processor {
+
+ private final Chain<Processor> streamChain;
+
+ public StreamProcessingInitiator(Chain<Processor> streamChain) {
+ this.streamChain = streamChain;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ // TODO: Consider for to best provide helpers for this
+ response.data().addDataListener(new RunnableExecution(request,
+ new ExecutionWithResponse(streamChain, response, execution)));
+ return response;
+ }
+
+ }
+
+ /**
+ * A processor which on invocation prints the string given on construction
+ */
+ public static class Echo extends Processor {
+
+ private String s;
+
+ public Echo(String s) {
+ this.s = s;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ System.out.println(s);
+ return execution.process(request);
+ }
+
+ }
+
+ /**
+ * A processor which adds a StringData item containing the string given in the constructor to every response
+ */
+ public static class StringDataAdder extends Processor {
+
+ private String string;
+
+ public StringDataAdder(String string) {
+ this.string = string;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ response.data().add(new StringData(request, string));
+ return response;
+ }
+
+ }
+
+ /**
+ * A processor which adds an ErrorMessage to the request of the top level
+ * data of each returned response.
+ */
+ public static class ErrorAdder extends Processor {
+
+ private ErrorMessage errorMessage;
+
+ public ErrorAdder(ErrorMessage errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ response.data().request().errors().add(errorMessage);
+ return response;
+ }
+
+ }
+
+ /**
+ * A processor which adds a List of StringData items containing the strings given in the constructor to every response
+ */
+ public static class StringDataListAdder extends Processor {
+
+ private String[] strings;
+
+ public StringDataListAdder(String... strings) {
+ this.strings = strings;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response = execution.process(request);
+ DataList<StringData> list = ArrayDataList.create(request);
+ for (String string : strings)
+ list.add(new StringData(request, string));
+ response.data().add(list);
+ return response;
+ }
+
+ }
+
+ /**
+ * Adds a the given trace message at the given trace level
+ */
+ public static class Trace extends Processor {
+
+ private String traceMessage;
+ private int traceLevel;
+
+ public Trace(String traceMessage, int traceLevel) {
+ this.traceMessage = traceMessage;
+ this.traceLevel = traceLevel;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ execution.trace().trace(traceMessage, traceLevel);
+ return execution.process(request);
+ }
+
+ }
+
+ public static final class StatusSetter extends Processor {
+
+ private final int status;
+
+ public StatusSetter(int status) {
+ this.status = status;
+ }
+
+ @Override
+ public com.yahoo.processing.Response process(com.yahoo.processing.Request request, Execution execution) {
+ request.errors().add(new ErrorMessage(status, ""));
+ return execution.process(request);
+ }
+
+ }
+
+ /**
+ * Adds (key, value) to the log value trace.
+ */
+ public static class LogValueAdder extends Processor {
+ private final String key;
+ private final String value;
+
+ public LogValueAdder(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ execution.trace().logValue(key, value);
+ return execution.process(request);
+ }
+ }
+}
diff --git a/processing/src/main/java/com/yahoo/processing/test/Responses.java b/processing/src/main/java/com/yahoo/processing/test/Responses.java
new file mode 100644
index 00000000000..bd795770b87
--- /dev/null
+++ b/processing/src/main/java/com/yahoo/processing/test/Responses.java
@@ -0,0 +1,32 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test;
+
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.DataList;
+
+/**
+ * Static utilities
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ * @since 5.1.13
+ */
+public class Responses {
+
+ /**
+ * Returns a data item as a recursively indented string
+ */
+ public static String recursiveToString(Data data) {
+ StringBuilder b = new StringBuilder();
+ asString(data, b, "");
+ return b.toString();
+ }
+
+ private static void asString(Data data, StringBuilder b, String indent) {
+ b.append(indent).append(data).append("\n");
+ if (!(data instanceof DataList)) return;
+ for (Data childData : ((DataList<? extends Data>) data).asList()) {
+ asString(childData, b, indent.concat(" "));
+ }
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java b/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java
new file mode 100644
index 00000000000..4578299adff
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java
@@ -0,0 +1,139 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing;
+
+import com.yahoo.processing.response.ArrayDataList;
+import com.yahoo.processing.response.DataList;
+import com.yahoo.processing.test.ProcessorLibrary;
+import com.yahoo.processing.test.Responses;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author bratseth
+ */
+@SuppressWarnings("unchecked")
+public class ResponseTestCase {
+
+ /**
+ * Create a nested async tree of data elements, complete it recursively and check completion order.
+ * Check the recursive toString printing along the way.
+ * List variable names ends by numbers specifying the index of the list at each level.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRecursiveCompletionAndToString() throws InterruptedException, ExecutionException {
+ // create lists
+ Request request = new Request();
+ DataList list1 = ArrayDataList.create(request);
+ DataList list11 = ArrayDataList.create(request);
+ DataList list12 = ArrayDataList.createAsync(request);
+ DataList list13 = ArrayDataList.createAsync(request);
+ DataList list14 = ArrayDataList.create(request);
+ DataList list121 = ArrayDataList.createAsync(request);
+ DataList list122 = ArrayDataList.create(request);
+ DataList list123 = ArrayDataList.createAsync(request);
+ DataList list1231 = ArrayDataList.createAsync(request);
+ DataList list1232 = ArrayDataList.create(request);
+ // wire tree
+ list1.add(list11);
+ list1.add(list12);
+ list1.add(list13);
+ list1.add(list14);
+ list12.add(list121);
+ list12.add(list122);
+ list12.add(list123);
+ list123.add(list1231);
+ list123.add(list1232);
+ // add sync data elements
+ list1.add(new ProcessorLibrary.StringData(request,"list1"));
+ list12.add(new ProcessorLibrary.StringData(request,"list12"));
+ list14.add(new ProcessorLibrary.StringData(request,"list14"));
+ list122.add(new ProcessorLibrary.StringData(request,"list122"));
+ list1231.add(new ProcessorLibrary.StringData(request,"list1231"));
+
+ assertEqualsIgnoreObjectNumbers("Uncompleted tree, no incoming",uncompletedTreeUncompletedIncoming,Responses.recursiveToString(list1));
+
+ // provide all async incoming data
+ list12.incoming().markComplete();
+ list121.incoming().addLast(new ProcessorLibrary.StringData(request,"list121async1"));
+ list123.incoming().markComplete();
+ list1231.incoming().add(new ProcessorLibrary.StringData(request,"list13231async1"));
+ list1231.incoming().addLast(new ProcessorLibrary.StringData(request,"list1231async2"));
+ list13.incoming().add(new ProcessorLibrary.StringData(request,"list13async1"));
+ list13.incoming().addLast(new ProcessorLibrary.StringData(request,"list13async2"));
+
+ assertEqualsIgnoreObjectNumbers("Uncompleted tree, incoming complete",uncompletedTreeCompletedIncoming, Responses.recursiveToString(list1));
+
+ // complete all
+ Response.recursiveComplete(list1).get();
+ assertEqualsIgnoreObjectNumbers("Completed tree",completedTree,Responses.recursiveToString(list1));
+ }
+
+ private void assertEqualsIgnoreObjectNumbers(String explanation,String expected,String actual) {
+ assertEquals(explanation,expected,removeObjectNumbers(actual));
+ }
+
+ /** Removes all object numbers (occurrences of @hexnumber) */
+ private String removeObjectNumbers(String s) {
+ return s.replaceAll("@[0-9a-f]+","");
+ }
+
+ private static final String uncompletedTreeUncompletedIncoming=
+ "com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list122\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" +
+ " list1231\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list12\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list14\n" +
+ " list1\n";
+
+ private static final String uncompletedTreeCompletedIncoming=
+ "com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list121async1]]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list122\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data []]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list13231async1, list1231async2]]\n" +
+ " list1231\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list12\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list13async1, list13async2]]\n" +
+ " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" +
+ " list14\n" +
+ " list1\n";
+
+ private static final String completedTree=
+ "com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list121async1\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list122\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list1231\n" +
+ " list13231async1\n" +
+ " list1231async2\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list12\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list13async1\n" +
+ " list13async2\n" +
+ " com.yahoo.processing.response.ArrayDataList [completed]\n" +
+ " list14\n" +
+ " list1\n";
+}
diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java
new file mode 100644
index 00000000000..bdf04859151
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution.test;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import org.junit.Test;
+
+import static com.yahoo.processing.test.ProcessorLibrary.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class AsyncExecutionTestCase {
+
+ /** Execute a processing chain which forks off into multiple threads */
+ @Test
+ public void testAsyncExecution() {
+ // Create a chain
+ Chain<Processor> chain=new Chain<>(new CombineData(),new BlockingSplitter(2),new Get6DataItems(), new DataSource());
+
+ // Execute it
+ Request request=new Request();
+ request.properties().set("appendage",1);
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request);
+
+ // Verify the result
+ assertEquals(6*2-1,response.data().asList().size());
+ assertEquals("first.2, third.2",response.data().get(0).toString());
+ assertEquals("second.2",response.data().get(1).toString());
+ assertEquals("first.3",response.data().get(2).toString());
+ assertEquals("second.3",response.data().get(3).toString());
+ assertEquals("third.3",response.data().get(4).toString());
+ // from the parallel execution
+ assertEquals("first.2",response.data().get(5).toString());
+ assertEquals("second.2",response.data().get(6).toString());
+ assertEquals("third.2",response.data().get(7).toString());
+ assertEquals("first.3",response.data().get(8).toString());
+ assertEquals("second.3",response.data().get(9).toString());
+ assertEquals("third.3",response.data().get(10).toString());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java
new file mode 100644
index 00000000000..f5b3121f2f8
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java
@@ -0,0 +1,96 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution.test;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.test.ProcessorLibrary;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ExecutionContextTestCase extends junit.framework.TestCase {
+
+ private Chain<Processor> chain=new Chain<Processor>(new ProcessorLibrary.DataSource());
+
+ /** Tests combined use of trace messages, context values and access log entries */
+ public void testtrace() {
+ Execution execution1=Execution.createRoot(chain,2,Execution.Environment.createEmpty());
+ execution1.trace().setProperty("a","a1");
+ execution1.trace().logValue("a","a1");
+ execution1.trace().trace("root 1", 2);
+ execution1.trace().setProperty("a","a2");
+ execution1.trace().setProperty("b","b1");
+ execution1.trace().logValue("a","a2");
+ execution1.trace().logValue("b","b1");
+
+ Execution execution2=new Execution(chain,execution1);
+ execution2.trace().setProperty("b","b2");
+ execution2.trace().logValue("b","b2");
+ execution2.trace().trace(" child-1 1", 2);
+ execution2.trace().setProperty("b", "b3");
+ execution2.trace().logValue("b","b3");
+
+ execution1.trace().setProperty("b","b4");
+ execution1.trace().logValue("b","b4");
+
+ Execution execution3=new Execution(chain,execution1);
+ execution3.trace().setProperty("b","b5");
+ execution3.trace().setProperty("c","c1");
+ execution3.trace().logValue("b","b5");
+ execution3.trace().logValue("c","c1");
+ execution3.trace().trace(" child-2 1", 2);
+
+ execution2.trace().setProperty("c","c2");
+ execution2.trace().logValue("c","c2");
+
+ execution1.trace().trace("root 2", 2);
+ execution3.trace().setProperty("d", "d1");
+ execution1.trace().logValue("d","d1");
+
+ execution2.trace().trace(" child-1 2", 2);
+ execution2.trace().setProperty("c", "c3");
+ execution2.trace().logValue("c","c3");
+
+ execution1.trace().setProperty("c","c4");
+ execution1.trace().logValue("c","c4");
+
+ Iterator<String> traceIterator=execution1.trace().traceNode().root().descendants(String.class).iterator();
+ assertEquals("root 1",traceIterator.next());
+ assertEquals(" child-1 1",traceIterator.next());
+ assertEquals(" child-1 2",traceIterator.next());
+ assertEquals(" child-2 1",traceIterator.next());
+ assertEquals("root 2",traceIterator.next());
+ assertFalse(traceIterator.hasNext());
+
+ // Verify context variables
+ assertEquals("a2", execution1.trace().getProperty("a"));
+ assertEquals("b5", execution1.trace().getProperty("b"));
+ assertEquals("c4", execution1.trace().getProperty("c"));
+ assertEquals("d1", execution1.trace().getProperty("d"));
+ assertNull(execution1.trace().getProperty("e"));
+
+ // Verify access log
+ Set<String> logValues=new HashSet<>();
+ for (Iterator<Execution.Trace.LogValue> logValueIterator=execution1.trace().logValueIterator(); logValueIterator.hasNext(); )
+ logValues.add(logValueIterator.next().toString());
+ assertEquals(12,logValues.size());
+ assertTrue(logValues.contains("a=a1"));
+ assertTrue(logValues.contains("a=a2"));
+ assertTrue(logValues.contains("b=b1"));
+ assertTrue(logValues.contains("b=b2"));
+ assertTrue(logValues.contains("b=b3"));
+ assertTrue(logValues.contains("b=b4"));
+ assertTrue(logValues.contains("b=b5"));
+ assertTrue(logValues.contains("c=c1"));
+ assertTrue(logValues.contains("c=c2"));
+ assertTrue(logValues.contains("d=d1"));
+ assertTrue(logValues.contains("c=c3"));
+ assertTrue(logValues.contains("c=c4"));
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java
new file mode 100644
index 00000000000..09d7c38322f
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java
@@ -0,0 +1,173 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution.test;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.response.DataList;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static com.yahoo.processing.test.ProcessorLibrary.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests scenarios where a data producer returns a promise of some future data rather than the data itself.
+ * As no processor waits for the data it is returned all the way to the caller.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class FutureDataTestCase {
+
+ /** Run a chain which ends in a processor which returns a response containing future data. */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFutureDataPassThrough() throws InterruptedException, ExecutionException, TimeoutException {
+ // Set up
+ FutureDataSource futureDataSource=new FutureDataSource();
+ Chain<Processor> chain=new Chain<>(new DataCounter(),futureDataSource);
+
+ // Execute
+ Request request=new Request();
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); // Urk ...
+
+ // Verify the result prior to completion of delayed data
+ assertEquals(1,response.data().asList().size());
+ assertEquals("Data count: 0",response.data().get(0).toString());
+
+ // complete delayed data
+ assertEquals("Delayed data was requested once", 1, futureDataSource.incomingData.size());
+ futureDataSource.incomingData.get(0).add(new StringData(request, "d1"));
+ futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2"));
+ assertEquals("New data is not visible because we haven't asked for it", 1, response.data().asList().size());
+ response.data().complete().get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("Now the data is available", 3, response.data().asList().size());
+ assertEquals("d1",response.data().get(1).toString().toString());
+ assertEquals("d2",response.data().get(2).toString().toString());
+ }
+
+ /** Federate to one source which returns data immediately and one who return future data */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testFederateSyncAndAsyncData() throws InterruptedException, ExecutionException, TimeoutException {
+ // Set up
+ FutureDataSource futureDataSource=new FutureDataSource();
+ Chain<Processor> chain=new Chain<>(new DataCounter(),new Federator(new Chain<>(new DataSource()),new Chain<>(futureDataSource)));
+
+ // Execute
+ Request request=new Request();
+ request.properties().set("appendage",1);
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request);
+
+ // Verify the result prior to completion of delayed data
+ assertEquals(3,response.data().asList().size()); // The sync data list + the (currently empty) future data list) + the data count
+ DataList syncData=(DataList)response.data().get(0);
+ DataList asyncData=(DataList)response.data().get(1);
+ StringData countData=(StringData)response.data().get(2);
+
+ assertEquals("The sync data is available",3,syncData.asList().size());
+ assertEquals( "first.1",syncData.get(0).toString());
+ assertEquals("second.1", syncData.get(1).toString());
+ assertEquals( "third.1",syncData.get(2).toString());
+ assertEquals("No async data yet",0,asyncData.asList().size());
+ assertEquals("The data counter has run and accessed the sync data","Data count: 3",countData.toString());
+
+ // complete async data
+ futureDataSource.incomingData.get(0).add(new StringData(request, "d1"));
+ futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2"));
+ assertEquals("New data is not visible because we haven't asked for it", 0, asyncData.asList().size());
+ asyncData.complete().get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("Now the data is available", 2, asyncData.asList().size());
+ assertEquals("d1",asyncData.get(0).toString().toString());
+ assertEquals("d2", asyncData.get(1).toString().toString());
+ }
+
+ /** Register a chain which will be called when some async data is available */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAsyncDataProcessing() throws InterruptedException, ExecutionException, TimeoutException {
+ // Set up
+ FutureDataSource futureDataSource=new FutureDataSource();
+ Chain<Processor> asyncChain=new Chain<Processor>(new DataCounter());
+ Chain<Processor> chain=new Chain<>(new AsyncDataProcessingInitiator(asyncChain),futureDataSource);
+
+ // Execute
+ Request request=new Request();
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request);
+
+ // Verify the result prior to completion of delayed data
+ assertEquals("No data yet",0,response.data().asList().size());
+
+ // complete async data
+ futureDataSource.incomingData.get(0).add(new StringData(request, "d1"));
+ assertEquals("New data is not visible because it is not complete", 0, response.data().asList().size());
+ futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2"));
+ assertEquals("Not visible because it has not been synced yet", 0, response.data().asList().size());
+ response.data().complete().get(1000, TimeUnit.MILLISECONDS);
+ assertEquals("Now the data as well as the count is available", 3, response.data().asList().size());
+ assertEquals("d1",response.data().get(0).toString().toString());
+ assertEquals("d2",response.data().get(1).toString().toString());
+ assertEquals("Data count: 2",response.data().get(2).toString());
+ }
+
+ /**
+ * Register a chain which federates over three sources, two of which are future.
+ * When the first of the futures are done one additional chain is to be run.
+ * When both are done another chain is to be run.
+ */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testAsyncDataProcessingOfFederatedResult() throws InterruptedException, ExecutionException, TimeoutException {
+ // Set up
+ // Source 1 (async with completion chain)
+ FutureDataSource futureSource1=new FutureDataSource();
+ Chain<Processor> asyncChainSource1=new Chain<Processor>(new DataCounter("source1"));
+ Chain<Processor> chainSource1=new Chain<>(new AsyncDataProcessingInitiator(asyncChainSource1),futureSource1);
+ // Source 2 (async source)
+ FutureDataSource futureSource2=new FutureDataSource();
+ Chain<Processor> chainSource2=new Chain<Processor>(futureSource2);
+ // Source 3 (sync source)
+ Chain<Processor> chainSource3=new Chain<Processor>(new DataSource());
+ // Main chain federating to the above - not waiting for source 1 and 2 but invoking asyncMain when both are complete
+ Chain<Processor> asyncMain=new Chain<Processor>(new DataCounter("main"));
+ Chain<Processor> main=new Chain<>(new AsyncDataProcessingInitiator(asyncMain),new Federator(chainSource1,chainSource2,chainSource3));
+
+ // Execute
+ Request request=new Request();
+ Response response=Execution.createRoot(main,0,Execution.Environment.createEmpty()).process(request);
+
+ // Verify the result prior to completion of delayed data
+ assertEquals("We have the sync data plus placeholders for the async lists",3,response.data().asList().size());
+ DataList source1Data=((DataList)response.data().get(0));
+ DataList source2Data=((DataList)response.data().get(1));
+ DataList source3Data=((DataList)response.data().get(2));
+
+ assertEquals("No data yet",0,source1Data.asList().size());
+ assertEquals("No data yet",0,source2Data.asList().size());
+ assertEquals(3,source3Data.asList().size());
+
+ // complete async data in source1
+ futureSource1.incomingData.get(0).addLast(new StringData(request,"source1Data"));
+ assertEquals("Not visible yet", 0, source1Data.asList().size());
+ source1Data.complete().get(1000, TimeUnit.MILLISECONDS);
+ assertEquals(2, source1Data.asList().size());
+ assertEquals("source1Data",source1Data.get(0).toString());
+ assertEquals("Completion listener chain on this has run", "[source1] Data count: 1", source1Data.get(1).toString());
+
+ // source2 & main completion
+ assertEquals("Main completion listener has not run", 3, response.data().asList().size());
+ futureSource2.incomingData.get(0).addLast(new StringData(request, "source2Data"));
+ assertEquals("Main completion listener has not run", 3, response.data().asList().size());
+
+ Response.recursiveComplete(response.data()).get();
+ assertEquals("Main completion listener has run", 4, response.data().asList().size());
+ assertEquals("The main data counter saw all sync data, but not source2 data as it executes after this",
+ "[main] Data count: " + (2 + 0 + 3), response.data().get(3).toString());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java
new file mode 100644
index 00000000000..1ce4e293104
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java
@@ -0,0 +1,107 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.execution.test;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.response.Data;
+import com.yahoo.processing.response.IncomingData;
+import com.yahoo.processing.test.ProcessorLibrary;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests listening on every available new piece of data in a response
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class StreamingTestCase {
+
+ /** Tests adding a chain which is called every time new data is added to a data list */
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testStreamingData() throws InterruptedException, ExecutionException, TimeoutException {
+ // Set up
+ StreamProcessor streamProcessor = new StreamProcessor();
+ Chain<Processor> streamProcessing = new Chain<Processor>(streamProcessor);
+ ProcessorLibrary.FutureDataSource futureDataSource=new ProcessorLibrary.FutureDataSource();
+ Chain<Processor> main=new Chain<>(new ProcessorLibrary.DataCounter(),
+ new ProcessorLibrary.StreamProcessingInitiator(streamProcessing),
+ futureDataSource);
+
+ // Execute
+ Request request=new Request();
+ Response response= Execution.createRoot(main, 0, Execution.Environment.createEmpty()).process(request);
+ IncomingData incomingData = futureDataSource.incomingData.get(0);
+
+ // State prior to receiving any additional data
+ assertEquals(1,response.data().asList().size());
+ assertEquals("Data count: 0",response.data().get(0).toString());
+ assertEquals("Add data listener invoked also for DataCounter", 1, streamProcessor.invocationCount);
+ assertEquals("Initial data count", 1, response.data().asList().size());
+
+ // add first data - we have no listener so the data is held in the incoming buffer
+ incomingData.add(new ProcessorLibrary.StringData(request, "d1"));
+ assertEquals("Data add listener not invoked as we are not listening on new data yet",1, streamProcessor.invocationCount);
+ assertEquals("New data is not consumed", 1, response.data().asList().size());
+
+ // start listening on incoming data - this is what a renderer will do
+ incomingData.addNewDataListener(new MockNewDataListener(incomingData), MoreExecutors.sameThreadExecutor());
+ assertEquals("We got a data add event for the data which was already added", 2, streamProcessor.invocationCount);
+ assertEquals("New data is consumed", 2, response.data().asList().size());
+
+ incomingData.add(new ProcessorLibrary.StringData(request, "d2"));
+ assertEquals("We are now getting data add events each time", 3, streamProcessor.invocationCount);
+ assertEquals("New data is consumed", 3, response.data().asList().size());
+
+ incomingData.addLast(new ProcessorLibrary.StringData(request, "d3"));
+ assertEquals("We are getting data add events also the last time", 4, streamProcessor.invocationCount);
+ assertEquals("New data is consumed", 4, response.data().asList().size());
+
+ response.data().complete().get(1000, TimeUnit.MILLISECONDS); // no-op here
+ assertEquals("d1",response.data().get(1).toString().toString());
+ assertEquals("d2",response.data().get(2).toString().toString());
+ assertEquals("d3",response.data().get(3).toString().toString());
+ }
+
+ private static class MockNewDataListener implements Runnable {
+
+ private final IncomingData<Data> incomingData;
+
+ public MockNewDataListener(IncomingData<Data> incomingData) {
+ this.incomingData = incomingData;
+ }
+
+ @Override
+ public void run() {
+ // consume new data
+ for (Data newData : incomingData.drain()) {
+ incomingData.getOwner().add(newData);
+ }
+ // actual rendering would go here (at this point data add listeners will have executed)
+ }
+
+ }
+
+ private static class StreamProcessor extends Processor {
+
+ int invocationCount;
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ invocationCount++;
+ return execution.process(request);
+ }
+
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java b/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java
new file mode 100644
index 00000000000..0d180bb6dbb
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java
@@ -0,0 +1,158 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request;
+
+import static org.junit.Assert.*;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Splitter;
+import com.yahoo.text.Lowercase;
+
+/**
+ * Module local test of the basic property name building block.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class CompoundNameTestCase {
+
+ private static final String NAME = "com.yahoo.processing.request.CompoundNameTestCase";
+ private CompoundName cn;
+
+ @Before
+ public void setUp() throws Exception {
+ cn = new CompoundName(NAME);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testLast() {
+ assertEquals(NAME.substring(NAME.lastIndexOf('.') + 1), cn.last());
+ }
+
+ @Test
+ public final void testFirst() {
+ assertEquals(NAME.substring(0, NAME.indexOf('.')), cn.first());
+ }
+
+ @Test
+ public final void testRest() {
+ assertEquals(NAME.substring(NAME.indexOf('.') + 1), cn.rest().toString());
+ }
+
+ @Test
+ public final void testRestN() {
+ assertEquals("a.b.c.d.e", new CompoundName("a.b.c.d.e").rest(0).toString());
+ assertEquals("b.c.d.e", new CompoundName("a.b.c.d.e").rest(1).toString());
+ assertEquals("c.d.e", new CompoundName("a.b.c.d.e").rest(2).toString());
+ assertEquals("d.e", new CompoundName("a.b.c.d.e").rest(3).toString());
+ assertEquals("e", new CompoundName("a.b.c.d.e").rest(4).toString());
+ assertEquals("", new CompoundName("a.b.c.d.e").rest(5).toString());
+ }
+
+ @Test
+ public final void testPrefix() {
+ assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("")));
+ assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a")));
+ assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b")));
+ assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.c")));
+
+ assertFalse(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.c.d")));
+ assertFalse(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.d")));
+ }
+
+ @Test
+ public final void testSize() {
+ Splitter s = Splitter.on('.');
+ Iterable<String> i = s.split(NAME);
+ int n = 0;
+ for (@SuppressWarnings("unused") String x : i) {
+ ++n;
+ }
+ assertEquals(n, cn.size());
+ }
+
+ @Test
+ public final void testGet() {
+ String s = cn.get(0);
+ assertEquals(NAME.substring(0, NAME.indexOf('.')), s);
+ }
+
+ @Test
+ public final void testIsCompound() {
+ assertTrue(cn.isCompound());
+ }
+
+ @Test
+ public final void testIsEmpty() {
+ assertFalse(cn.isEmpty());
+ }
+
+ @Test
+ public final void testAsList() {
+ List<String> l = cn.asList();
+ Splitter peoplesFront = Splitter.on('.');
+ Iterable<String> answer = peoplesFront.split(NAME);
+ Iterator<String> expected = answer.iterator();
+ for (int i = 0; i < l.size(); ++i) {
+ assertEquals(expected.next(), l.get(i));
+ }
+ assertFalse(expected.hasNext());
+ }
+
+ @Test
+ public final void testEqualsObject() {
+ assertFalse(cn.equals(NAME));
+ assertFalse(cn.equals(null));
+ assertTrue(cn.equals(cn));
+ assertTrue(cn.equals(new CompoundName(NAME)));
+ }
+
+ @Test
+ public final void testEmptyNonEmpty() {
+ assertTrue(new CompoundName("").isEmpty());
+ assertEquals(0, new CompoundName("").size());
+ assertFalse(new CompoundName("a").isEmpty());
+ assertEquals(1, new CompoundName("a").size());
+ CompoundName empty = new CompoundName("a.b.c");
+ assertTrue(empty == empty.rest(0));
+ assertFalse(empty == empty.rest(1));
+ }
+
+ @Test
+ public final void testGetLowerCasedName() {
+ assertEquals(Lowercase.toLowerCase(NAME), cn.getLowerCasedName());
+ }
+
+ @Test
+ public void testAppend() {
+ assertEquals(new CompoundName("a.b.c.d"), new CompoundName("").append(new CompoundName("a.b.c.d")));
+ assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a").append(new CompoundName("b.c.d")));
+ assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b").append(new CompoundName("c.d")));
+ assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b.c").append(new CompoundName("d")));
+ assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b.c.d").append(new CompoundName("")));
+ }
+
+ @Test
+ public void empty_CompoundName_is_prefix_of_any_CompoundName() {
+ CompoundName empty = new CompoundName("");
+
+ assertTrue(empty.hasPrefix(empty));
+ assertTrue(new CompoundName("a").hasPrefix(empty));
+ }
+
+ @Test
+ public void whole_components_must_match_to_be_prefix() {
+ CompoundName stringPrefix = new CompoundName("a");
+ CompoundName name = new CompoundName("aa");
+
+ assertFalse(name.hasPrefix(stringPrefix));
+ }
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java
new file mode 100644
index 00000000000..6c512c82903
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java
@@ -0,0 +1,52 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.test;
+
+import com.yahoo.processing.request.CompoundName;
+
+/**
+ * @author balder
+ */
+public class CompoundNameBenchmark {
+ public void run() {
+ long result=0;
+ String strings[] = createStrings(1000);
+ // Warm-up
+ out("Warming up...");
+ for (int i=0; i<10*1000; i++)
+ result+=createCompundName(strings);
+
+ long startTime=System.currentTimeMillis();
+ out("Running...");
+ for (int i=0; i<100*1000; i++)
+ result+=createCompundName(strings);
+ out("Ignore this: " + result); // Make sure we are not fooled by optimization by creating an observable result
+ long endTime=System.currentTimeMillis();
+ out("Compoundification 1000 strings 100.000 times took " + (endTime-startTime) + " ms");
+ }
+
+ private final String [] createStrings(int num) {
+ String strings [] = new String [num];
+ for(int i=0; i < strings.length; i++) {
+ strings[i] = "this.is.a.short.compound.name." + i;
+ }
+ return strings;
+ }
+
+ private final int createCompundName(String [] strings) {
+ int retval = 0;
+ for (int i=0; i < strings.length; i++) {
+ CompoundName n = new CompoundName(strings[i]);
+ retval += n.size();
+ }
+ return retval;
+ }
+
+ private void out(String string) {
+ System.out.println(string);
+ }
+
+ public static void main(String[] args) {
+ new CompoundNameBenchmark().run();
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java
new file mode 100644
index 00000000000..647000c5f88
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java
@@ -0,0 +1,66 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.test;
+
+import com.yahoo.processing.request.CompoundName;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+/**
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class CompoundNameTestCase {
+
+ @Test
+ public void testFirstRest() {
+ assertEquals(CompoundName.empty, CompoundName.empty.rest());
+
+ CompoundName n=new CompoundName("on.two.three");
+ assertEquals("on", n.first());
+ assertEquals("two.three", n.rest().toString());
+ n=n.rest();
+ assertEquals("two", n.first());
+ assertEquals("three", n.rest().toString());
+ n=n.rest();
+ assertEquals("three", n.first());
+ assertEquals("", n.rest().toString());
+ n=n.rest();
+ assertEquals("", n.first());
+ assertEquals("", n.rest().toString());
+ n=n.rest();
+ assertEquals("", n.first());
+ assertEquals("", n.rest().toString());
+ }
+
+ @Test
+ public void testHashCodeAndEquals() {
+ CompoundName n1 = new CompoundName("venn.d.a");
+ CompoundName n2 = new CompoundName(n1.asList());
+ assertEquals(n1.hashCode(), n2.hashCode());
+ assertEquals(n1, n2);
+ }
+
+ @Test
+ public void testAppend() {
+ assertEquals("a",new CompoundName("a").append("").toString());
+ assertEquals("a",new CompoundName("").append("a").toString());
+ assertEquals("a.b",new CompoundName("a").append("b").toString());
+
+ CompoundName name = new CompoundName("a.b");
+ assertEquals("a.b.c",name.append("c").toString());
+ assertEquals("a.b.d",name.append("d").toString());
+ }
+
+ @Test
+ public void testEmpty() {
+ CompoundName empty=new CompoundName("");
+ assertEquals("", empty.toString());
+ assertEquals(0, empty.asList().size());
+ }
+
+ @Test
+ public void testAsList() {
+ assertEquals("[one]", new CompoundName("one").asList().toString());
+ assertEquals("[one, two, three]", new CompoundName("one.two.three").asList().toString());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java
new file mode 100644
index 00000000000..82fd25f9754
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java
@@ -0,0 +1,54 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.test;
+
+import com.yahoo.processing.Request;
+import com.yahoo.processing.request.ErrorMessage;
+import org.junit.Test;
+
+/**
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ErrorMessageTestCase extends junit.framework.TestCase {
+
+ @Test
+ public void testToString() {
+ assertEquals("message",new ErrorMessage("message").toString());
+ assertEquals("message: hello",new ErrorMessage("message",new Exception("hello")).toString());
+ assertEquals("message: detail",new ErrorMessage("message","detail").toString());
+ assertEquals("37: message: detail",new ErrorMessage(37,"message","detail").toString());
+ assertEquals("message: detail: hello",new ErrorMessage("message","detail",new Exception("hello")).toString());
+ assertEquals("message: detail: hello: world",new ErrorMessage("message","detail",new Exception("hello",new Exception("world"))).toString());
+ assertEquals("message: detail: hello: Exception",new ErrorMessage("message","detail",new Exception("hello",new Exception())).toString());
+ assertEquals("message: detail: hello",new ErrorMessage("message","detail",new Exception(new Exception("hello"))).toString());
+ assertEquals("message: detail: java.lang.Exception: Exception",new ErrorMessage("message","detail",new Exception(new Exception())).toString());
+ }
+
+ @Test
+ public void testAccessors() {
+ ErrorMessage m = new ErrorMessage(37,"message","detail",new Exception("hello"));
+ assertEquals(37,m.getCode());
+ assertEquals("message",m.getMessage());
+ assertEquals("detail",m.getDetailedMessage());
+ assertEquals("hello",m.getCause().getMessage());
+ }
+
+ @Test
+ public void testEquality() {
+ assertEquals(new ErrorMessage(37,"message","detail",new Exception("hello")),
+ new ErrorMessage(37,"message","detail",new Exception("hello")));
+ assertEquals(new ErrorMessage("message","detail",new Exception("hello")),
+ new ErrorMessage("message","detail",new Exception("hello")));
+ assertEquals(new ErrorMessage("message",new Exception("hello")),
+ new ErrorMessage("message",new Exception("hello")));
+ assertEquals(new ErrorMessage("message"),
+ new ErrorMessage("message"));
+ assertEquals(new ErrorMessage("message",new Exception()),
+ new ErrorMessage("message"));
+ assertFalse(new ErrorMessage("message").equals(new ErrorMessage("message","detail")));
+ assertFalse(new ErrorMessage(37,"message").equals(new ErrorMessage("message")));
+ assertFalse(new ErrorMessage(37,"message").equals(new ErrorMessage(38,"message")));
+ assertFalse(new ErrorMessage("message","detail1").equals(new ErrorMessage("message","detail2")));
+ assertFalse(new ErrorMessage("message1").equals(new ErrorMessage("message2")));
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java
new file mode 100644
index 00000000000..61ef75c55a2
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java
@@ -0,0 +1,81 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.test;
+
+import com.yahoo.processing.request.properties.PropertyMap;
+import com.yahoo.processing.request.properties.PublicCloneable;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class PropertyMapTestCase extends junit.framework.TestCase {
+
+ public void testCloning() {
+ PropertyMap map=new PropertyMap();
+ map.set("clonable",new ClonableObject());
+ map.set("publicClonable",new PublicClonableObject());
+ map.set("nonclonable",new NonClonableObject());
+ map.set("clonableArray",new ClonableObject[] {new ClonableObject()});
+ map.set("publicClonableArray",new ClonableObject[] {new ClonableObject()});
+ map.set("nonclonableArray",new NonClonableObject[] {new NonClonableObject()});
+ map.set("clonableList", Collections.singletonList(new ClonableObject()));
+ map.set("nonclonableList", Collections.singletonList(new NonClonableObject()));
+ assertNotNull(map.get("clonable"));
+ assertNotNull(map.get("nonclonable"));
+
+ PropertyMap mapClone=map.clone();
+ assertTrue(map.get("clonable") != mapClone.get("clonable"));
+ assertTrue(map.get("publicClonable")!= mapClone.get("publicClonable"));
+ assertTrue(map.get("nonclonable") == mapClone.get("nonclonable"));
+
+ assertTrue(map.get("clonableArray") != mapClone.get("clonableArray"));
+ assertTrue(first(map.get("clonableArray")) != first(mapClone.get("clonableArray")));
+ assertTrue(map.get("publicClonableArray") != mapClone.get("publicClonableArray"));
+ assertTrue(first(map.get("publicClonableArray")) != first(mapClone.get("publicClonableArray")));
+ assertTrue(first(map.get("nonclonableArray")) == first(mapClone.get("nonclonableArray")));
+ }
+
+ private Object first(Object object) {
+ if (object instanceof Object[])
+ return ((Object[])object)[0];
+ if (object instanceof List)
+ return ((List<?>)object).get(0);
+ throw new IllegalArgumentException();
+ }
+
+ public static class ClonableObject implements Cloneable {
+
+ @Override
+ public ClonableObject clone() {
+ try {
+ return (ClonableObject)super.clone();
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static class PublicClonableObject implements PublicCloneable<PublicClonableObject> {
+
+ @Override
+ public PublicClonableObject clone() {
+ try {
+ return (PublicClonableObject)super.clone();
+ }
+ catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private static class NonClonableObject {
+
+ }
+
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java
new file mode 100644
index 00000000000..7045ea1efbd
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java
@@ -0,0 +1,137 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.request.test;
+
+import com.yahoo.processing.Request;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.processing.request.ErrorMessage;
+import com.yahoo.processing.request.Properties;
+import com.yahoo.processing.request.properties.PropertyMap;
+import org.junit.Test;
+
+/**
+ * Tests using requests
+ *
+ * @author bratseth
+ */
+public class RequestTestCase extends junit.framework.TestCase {
+
+ @Test
+ public void testProperties() {
+ Properties p = new PropertyMap();
+ p.set("a", "a1");
+ Request r = new Request(p);
+ r.properties().set("b", "b1");
+ assertEquals(2, r.properties().listProperties().size());
+ assertEquals("a1", r.properties().get("a"));
+
+ assertEquals("b1", r.properties().get("b"));
+ assertEquals("b1", r.properties().get("b", "default"));
+ assertEquals("default", r.properties().get("c", "default"));
+ assertNull(r.properties().get("c"));
+ assertEquals("b1", r.properties().get(new CompoundName("b")));
+ assertEquals("b1", r.properties().get(new CompoundName("b"), "default"));
+ assertEquals("default", r.properties().get(new CompoundName("c"), "default"));
+ assertNull(r.properties().get(new CompoundName("c")));
+
+ assertEquals("b1",r.properties().getString("b"));
+ assertEquals("b1",r.properties().getString("b","default"));
+ assertEquals("default",r.properties().getString("c","default"));
+ assertEquals(null,r.properties().getString("c"));
+ assertEquals("b1",r.properties().getString(new CompoundName("b")));
+ assertEquals("b1",r.properties().getString(new CompoundName("b"),"default"));
+ assertEquals("default",r.properties().getString(new CompoundName("c"),"default"));
+ assertEquals(null,r.properties().getString(new CompoundName("c")));
+
+ r.properties().set("i",7);
+ assertEquals(7,(int)r.properties().getInteger("i"));
+ assertEquals(7,(int)r.properties().getInteger("i",3));
+ assertEquals(3,(int)r.properties().getInteger("n",3));
+ assertNull(r.properties().getInteger("n"));
+ assertEquals(7,(int)r.properties().getInteger(new CompoundName("i")));
+ assertEquals(7,(int)r.properties().getInteger(new CompoundName("i"),3));
+ assertEquals(3,(int)r.properties().getInteger(new CompoundName("n"),3));
+ assertNull(r.properties().getInteger("n"));
+
+ r.properties().set(new CompoundName("l"),7);
+ assertEquals(7, (long) r.properties().getLong("l"));
+ assertEquals(7,(long)r.properties().getLong("l",3l));
+ assertEquals(3,(long)r.properties().getLong("m",3l));
+ assertNull(r.properties().getInteger("m"));
+ assertEquals(7,(long)r.properties().getLong(new CompoundName("l")));
+ assertEquals(7,(long)r.properties().getLong(new CompoundName("l"),3l));
+ assertEquals(3,(long)r.properties().getLong(new CompoundName("m"),3l));
+ assertNull(r.properties().getInteger("m"));
+
+ r.properties().set("d",7.3);
+ assertEquals(7.3,r.properties().getDouble("d"));
+ assertEquals(7.3,r.properties().getDouble("d",3.4d));
+ assertEquals(3.4,r.properties().getDouble("f",3.4d));
+ assertNull(r.properties().getDouble("f"));
+ assertEquals(7.3,r.properties().getDouble(new CompoundName("d")));
+ assertEquals(7.3,r.properties().getDouble(new CompoundName("d"),3.4d));
+ assertEquals(3.4,r.properties().getDouble(new CompoundName("f"),3.4d));
+ assertNull(r.properties().getDouble("f"));
+
+ r.properties().set("o",true);
+ assertEquals(true,r.properties().getBoolean("o"));
+ assertEquals(true,r.properties().getBoolean("o",true));
+ assertEquals(true,r.properties().getBoolean("g",true));
+ assertEquals(false, r.properties().getBoolean("g"));
+ assertEquals(true,r.properties().getBoolean(new CompoundName("o")));
+ assertEquals(true,r.properties().getBoolean(new CompoundName("o"),true));
+ assertEquals(true,r.properties().getBoolean(new CompoundName("g"),true));
+ assertEquals(false, r.properties().getBoolean("g"));
+
+ r.properties().set(new CompoundName("x.y"), "x1.y1");
+ r.properties().set("x.z", "x1.z1");
+
+ assertEquals(8, r.properties().listProperties().size());
+ assertEquals(0, r.properties().listProperties("a").size());
+ assertEquals(0, r.properties().listProperties(new CompoundName("a")).size());
+ assertEquals(0, r.properties().listProperties(new CompoundName("none")).size());
+ assertEquals(2, r.properties().listProperties(new CompoundName("x")).size());
+ assertEquals(2, r.properties().listProperties("x").size());
+ }
+
+ @Test
+ public void testErrorMessages() {
+ Request r = new Request();
+ r.errors().add(new ErrorMessage("foo"));
+ r.errors().add(new ErrorMessage("bar"));
+ assertEquals(2,r.errors().size());
+ assertEquals("foo",r.errors().get(0).getMessage());
+ assertEquals("bar",r.errors().get(1).getMessage());
+
+ }
+
+ @Test
+ public void testCloning() {
+ Request request = new Request();
+ request.properties().set("a","a1");
+ request.properties().set("b","b1");
+ request.errors().add(new ErrorMessage("foo"));
+ request.errors().add(new ErrorMessage("bar"));
+ Request rcloned = request.clone();
+ rcloned.properties().set("c", "c1");
+ rcloned.errors().add(new ErrorMessage("baz"));
+ request.properties().set("d", "d1");
+ request.errors().add(new ErrorMessage("boz"));
+
+ assertEquals("a1",request.properties().get("a"));
+ assertEquals("a1",rcloned.properties().get("a"));
+ assertEquals("b1",request.properties().get("b"));
+ assertEquals("b1",rcloned.properties().get("b"));
+ assertEquals(null,request.properties().get("c"));
+ assertEquals("c1",rcloned.properties().get("c"));
+ assertEquals("d1",request.properties().get("d"));
+ assertEquals(null,rcloned.properties().get("d"));
+
+ assertEquals(3,request.errors().size());
+ assertEquals(1,rcloned.errors().size());
+ assertEquals("foo",request.errors().get(0).getMessage());
+ assertEquals("bar",request.errors().get(1).getMessage());
+ assertEquals("boz",request.errors().get(2).getMessage());
+ assertEquals("baz",rcloned.errors().get(0).getMessage());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java b/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java
new file mode 100644
index 00000000000..ea51e9079cc
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java
@@ -0,0 +1,44 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.test.documentation.AsyncDataProcessingInitiator;
+import com.yahoo.processing.test.documentation.AsyncDataProducer;
+import com.yahoo.processing.test.documentation.ExampleProcessor;
+import com.yahoo.processing.test.documentation.Federator;
+
+/**
+ * See to it we can actually run the examples in the doc.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class DocumentationTestCase {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public final void test() {
+ Processor p = new ExampleProcessor();
+ Chain<Processor> basic = new Chain<>(p);
+ Processor initiator = new AsyncDataProcessingInitiator(basic);
+ Chain<Processor> postProcessing = new Chain<>(initiator);
+ Execution e = Execution.createRoot(postProcessing, 0, Execution.Environment.createEmpty());
+ Response r = e.process(new Request());
+ // just adds a listener to the result returned from basic
+ assertEquals(0, r.data().asList().size());
+ Processor producer = new AsyncDataProducer();
+ Chain<Processor> asyncChain = new Chain<>(producer);
+ Processor federator = new Federator(basic, asyncChain);
+ e = Execution.createRoot(federator, 0, Execution.Environment.createEmpty());
+ r = e.process(new Request());
+ assertEquals(2, r.data().asList().size());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java b/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java
new file mode 100644
index 00000000000..4f306773c2a
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java
@@ -0,0 +1,60 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import org.junit.Test;
+
+import static com.yahoo.processing.test.ProcessorLibrary.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the basic of the processing framework
+ */
+public class ProcessingTestCase {
+
+ /** Execute three simple processors doing some phony processing */
+ @Test
+ public void testChainedProcessing1() {
+ // Create a chain
+ Chain<Processor> chain=new Chain<>(new CombineData(),new Get6DataItems(), new DataSource());
+
+ // Execute it
+ Request request=new Request();
+ request.properties().set("appendage",1);
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request);
+
+ // Verify the result
+ assertEquals(6-1,response.data().asList().size());
+ assertEquals("first.2, third.2",response.data().get(0).toString());
+ assertEquals("second.2",response.data().get(1).toString());
+ assertEquals("first.3",response.data().get(2).toString());
+ assertEquals("second.3",response.data().get(3).toString());
+ assertEquals("third.3",response.data().get(4).toString());
+ }
+
+ /** Execute the same processors in a different order */
+ @Test
+ public void testChainedProcessing2() {
+ // Create a chain
+ Chain<Processor> chain=new Chain<>(new Get6DataItems(),new CombineData(), new DataSource());
+
+ // Execute it
+ Request request=new Request();
+ request.properties().set("appendage",1);
+ Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request);
+
+ // Check the result
+ assertEquals(6,response.data().asList().size());
+ assertEquals("first.2, third.2",response.data().get(0).toString());
+ assertEquals("second.2",response.data().get(1).toString());
+ assertEquals("first.4, third.4",response.data().get(2).toString());
+ assertEquals("second.4",response.data().get(3).toString());
+ assertEquals("first.6, third.6",response.data().get(4).toString());
+ assertEquals("second.6",response.data().get(5).toString());
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java
new file mode 100644
index 00000000000..afda8a7fe96
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test.documentation;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.*;
+import com.yahoo.processing.execution.*;
+
+/**
+ * A processor which registers a listener on the future completion of
+ * asynchronously arriving data to perform another chain at that point.
+ */
+public class AsyncDataProcessingInitiator extends Processor {
+
+ private final Chain<Processor> asyncChain;
+
+ public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) {
+ this.asyncChain=asyncChain;
+ }
+
+ @Override
+ public Response process(Request request, Execution execution) {
+ Response response=execution.process(request);
+ response.data().complete().addListener(new RunnableExecution(request,
+ new ExecutionWithResponse(asyncChain, response, execution)),
+ MoreExecutors.sameThreadExecutor());
+ return response;
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java
new file mode 100644
index 00000000000..f2a51e240cc
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java
@@ -0,0 +1,37 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test.documentation;
+
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.response.ArrayDataList;
+import com.yahoo.processing.response.DataList;
+import com.yahoo.processing.response.IncomingData;
+import com.yahoo.processing.test.ProcessorLibrary.StringData;
+
+/**
+ * A data producer which producer data which will receive asynchronously.
+ * This is not a realistic, thread safe implementation as only the incoming data
+ * from the last created incoming data can be completed.
+ */
+public class AsyncDataProducer extends Processor {
+
+ private IncomingData incomingData;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ DataList dataList = ArrayDataList.createAsync(request); // Default implementation
+ incomingData=dataList.incoming();
+ return new Response(dataList);
+ }
+
+ /** Called by some other data producing thread, later */
+ @SuppressWarnings("unchecked")
+ public void completeLateData() {
+ incomingData.addLast(new StringData(incomingData.getOwner().request(),
+ "A late hello, world!"));
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java b/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java
new file mode 100644
index 00000000000..c87d508676d
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test.documentation;
+
+import com.yahoo.processing.*;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.test.ProcessorLibrary.StringData;
+
+public class ExampleProcessor extends Processor {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ // Process the Request:
+ request.properties().set("foo","bar");
+
+ // Pass it down the chain to get a response
+ Response response=execution.process(request);
+
+ // process the response
+ response.data().add(new StringData(request,"Hello, world!"));
+
+ return response;
+ }
+
+}
diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java b/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java
new file mode 100644
index 00000000000..c69bdf0c85c
--- /dev/null
+++ b/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java
@@ -0,0 +1,44 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.processing.test.documentation;
+
+import com.yahoo.component.chain.Chain;
+import com.yahoo.processing.Processor;
+import com.yahoo.processing.Request;
+import com.yahoo.processing.Response;
+import com.yahoo.processing.execution.AsyncExecution;
+import com.yahoo.processing.execution.Execution;
+import com.yahoo.processing.response.FutureResponse;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Call a number of chains in parallel
+ */
+public class Federator extends Processor {
+
+ private final List<Chain<? extends Processor>> chains;
+
+ @SafeVarargs
+ public Federator(Chain<? extends Processor> ... chains) {
+ this.chains= Arrays.asList(chains);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Response process(Request request, Execution execution) {
+ List<FutureResponse> futureResponses=new ArrayList<>(chains.size());
+ for (Chain<? extends Processor> chain : chains) {
+ futureResponses.add(new AsyncExecution(chain,execution).process(request));
+ }
+ Response response=execution.process(request);
+ AsyncExecution.waitForAll(futureResponses,1000);
+ for (FutureResponse futureResponse : futureResponses) {
+ Response federatedResponse=futureResponse.get();
+ response.data().add(federatedResponse.data());
+ response.mergeWith(federatedResponse);
+ }
+ return response;
+ }
+}