diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /processing |
Publish
Diffstat (limited to 'processing')
52 files changed, 5234 insertions, 0 deletions
diff --git a/processing/.gitignore b/processing/.gitignore new file mode 100644 index 00000000000..3cc25b51fc4 --- /dev/null +++ b/processing/.gitignore @@ -0,0 +1,2 @@ +/pom.xml.build +/target diff --git a/processing/OWNERS b/processing/OWNERS new file mode 100644 index 00000000000..31af040f698 --- /dev/null +++ b/processing/OWNERS @@ -0,0 +1 @@ +bratseth diff --git a/processing/pom.xml b/processing/pom.xml new file mode 100644 index 00000000000..3cc44b66e05 --- /dev/null +++ b/processing/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>processing</artifactId> + <packaging>jar</packaging> + <version>6-SNAPSHOT</version> + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>provided-dependencies</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>chain</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>component</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> +</project> diff --git a/processing/src/main/java/com/yahoo/processing/Processor.java b/processing/src/main/java/com/yahoo/processing/Processor.java new file mode 100644 index 00000000000..9339d5792b7 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/Processor.java @@ -0,0 +1,42 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.processing.execution.Execution; + +/** + * Superclass of chainable components processing Requests to create Responses. + * <p> + * Processors typically changes the Request and/or the Response. It may also make multiple + * forward requests, in series or parallel, or manufacture the response content itself or by calling + * an external service. + * <p> + * Typical usage: + * <code> + * public class MyProcessor extends Processor { + * + * @Override + * public Response process(Request request, Execution execution) { + * // process the request here + * Response response = execution.process(request); // Pass along to get the Response + * // process (or fill in) Data/DataList items on the response here + * return response; + * } + * + * } + * </code> + * + * @author bratseth + */ +public abstract class Processor extends ChainedComponent { + + /** + * Performs a processing request and returns the response + * + * @return a Response instance - never null - containing the data produced by this processor + * and those it forwards the request to + */ + public abstract Response process(Request request, Execution execution); + + +} diff --git a/processing/src/main/java/com/yahoo/processing/Request.java b/processing/src/main/java/com/yahoo/processing/Request.java new file mode 100644 index 00000000000..6a8c4211c79 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/Request.java @@ -0,0 +1,90 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.yahoo.component.provider.FreezableClass; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.request.Properties; +import com.yahoo.processing.request.properties.PropertyMap; + +import java.util.ArrayList; +import java.util.List; + +/** + * A generic processing request. + * The request contains a set of properties that are used to communicate information from the client making the + * processing request (e.g http parameters), and as a blackboard to pass information between processors. + * + * @author bratseth + */ +public class Request extends FreezableClass implements Cloneable { + + private Properties properties; + + /** + * The errors encountered while processing this request + */ + private List<ErrorMessage> errors = new ArrayList<>(0); + + /** + * The name of the chain of Processor instances which will be invoked when + * executing a request. + */ + public static final CompoundName CHAIN = new CompoundName("chain"); + + /** + * The name of the request property used in the processing framework to + * store the incoming JDisc request. + */ + public static final CompoundName JDISC_REQUEST = new CompoundName("jdisc.request"); + + /** + * Creates a request with no properties + */ + public Request() { + this(new PropertyMap()); + } + + /** + * Create a request with the given properties. + * This Request gains ownership of the given properties and may edit them in the future. + * + * @param properties the properties owner by this + */ + public Request(Properties properties) { + this.properties = properties; + } + + /** + * Returns the properties set on this request. + * Processors may add properties to send messages to downstream processors. + */ + public Properties properties() { + return properties; + } + + /** + * Returns the list of errors encountered while processing this request, never null. + * This is a live reference to the modifiable list of errors of this. + */ + public List<ErrorMessage> errors() { + return errors; + } + + /** + * Returns a clone of this request. + * <p> + * The properties are logically deeply cloned such that changes to properties in the clone are independent. + * <p> + * The errors of the original request <b>are not</b> cloned into the new instance: + * It will have an empty list of errors. + */ + @Override + public Request clone() { + Request clone = (Request) super.clone(); + clone.properties = properties.clone(); + clone.errors = new ArrayList<>(0); + return clone; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/Response.java b/processing/src/main/java/com/yahoo/processing/Response.java new file mode 100644 index 00000000000..3805311efba --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/Response.java @@ -0,0 +1,162 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.processing.execution.ResponseReceiver; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.ArrayDataList; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A Response to a Request. + * <p> + * A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or + * further nested data lists. + * <p> + * Frameworks built on top of processing may subclass this to create a stricter definition of a response. + * Processors producing Responses should not create subclasses but should instead + * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling + * execution.process(request), which will return an empty Response if there are no further processors in the chain. + * + * @author bratseth + */ +public class Response extends ListenableFreezableClass { + + private final static CompoundName freezeListenerKey =new CompoundName("processing.freezeListener"); + + private final DataList<?> data; + + /** + * Creates a request containing an empty array data list + */ + public Response(Request request) { + this(ArrayDataList.create(request)); + } + + /** + * Creates a response containing a list of data + */ + public Response(DataList<?> data) { + this.data = data; + + Runnable freezeListener = null; + Request request = data.request(); + if (request != null) // subclasses of DataList may not ensure this + freezeListener = (Runnable)request.properties().get(freezeListenerKey); + if (freezeListener != null) { + if (freezeListener instanceof ResponseReceiver) + ((ResponseReceiver)freezeListener).setResponse(this); + data.addFreezeListener(freezeListener, MoreExecutors.sameThreadExecutor()); + } + } + + /** + * Convenience constructor which adds the given error message to the given request + */ + public Response(Request request, ErrorMessage errorMessage) { + this(ArrayDataList.create(request)); + request.errors().add(errorMessage); + } + + /** + * Processors which merges another request into this must call this method to notify the response. + * This does not modify the data of either response. + */ + public void mergeWith(Response other) { + } + + /** + * Returns the top level list of data items of this response + */ + public DataList data() { + return data; + } + + // ------ static utilities ---------------------------------------------------------------------------- + + /** + * Returns a future in which the given data list and all lists nested within it are completed. + * The only use of the returned future is to call a get() method on it to complete the given dataList and + * all dataLists nested below it recursively. + * <p> + * Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called + * will not be completed. + * + * @param rootDataList the list to complete recursively + * @return the future in which all data in and below this list is complete, as the given root dataList for convenience + */ + public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) { + List<ListenableFuture<DataList<D>>> futures = new ArrayList<>(); + collectCompletionFutures(rootDataList, futures); + return new CompleteAllOnGetFuture<D>(futures); + } + + @SuppressWarnings("unchecked") + private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) { + futures.add((ListenableFuture<DataList<D>>) dataList.complete()); + for (D data : dataList.asList()) { + if (data instanceof DataList) + collectCompletionFutures((DataList<D>) data, futures); + } + } + + /** + * A future which on get calls get on all its given futures and sets the value returned from the + * first given future as its result. + */ + private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> { + + private final List<ListenableFuture<DataList<D>>> futures; + + public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) { + this.futures = new ArrayList<>(futures); + } + + @Override + public DataList<D> get() throws InterruptedException, ExecutionException { + DataList<D> result = null; + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(); + else + future.get(); + } + set(result); + return result; + } + + @Override + public DataList<D> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + DataList<D> result = null; + long timeLeft = unit.toMillis(timeout); + long currentCallStart = SystemTimer.INSTANCE.milliTime(); + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(timeLeft, TimeUnit.MILLISECONDS); + else + future.get(timeLeft, TimeUnit.MILLISECONDS); + long currentCallEnd = SystemTimer.INSTANCE.milliTime(); + timeLeft -= (currentCallEnd - currentCallStart); + if (timeLeft <= 0) break; + currentCallStart = currentCallEnd; + } + set(result); + return result; + } + + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java b/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java new file mode 100644 index 00000000000..7915fbedb0d --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/AsyncExecution.java @@ -0,0 +1,157 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.FutureResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +/** + * Provides asynchronous execution of processing chains. Usage: + * + * <pre> + * Execution execution = new Execution(chain); + * AsyncExecution asyncExecution = new AsyncExecution(execution); + * Future<Response> future = asyncExecution.process(request) + * try { + * result = future.get(timeout, TimeUnit.milliseconds); + * } catch(TimeoutException e) { + * // Handle timeout + * } + * </pre> + * + * <p> + * The request is not thread safe. A clone() must be made for each parallel processing. + * </p> + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @see Execution + */ +public class AsyncExecution { + + private static final ThreadFactory threadFactory = ThreadFactoryFactory.getThreadFactory("processing"); + + private static final Executor executorMain = createExecutor(); + private static final Executor createExecutor() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(100, + Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(false), threadFactory); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + return executor; + } + + /** + * The execution of this + */ + private final Execution execution; + + /** + * Create an async execution of a single processor + */ + public AsyncExecution(final Processor processor, Execution parent) { + this(new Execution(processor, parent)); + } + + /** + * Create an async execution of a chain + */ + public AsyncExecution(final Chain<? extends Processor> chain, Execution parent) { + this(new Execution(chain, parent)); + } + + /** + * Creates an async execution from an existing execution. This async + * execution will execute the chain from the given execution, starting + * from the next processor in that chain. This is handy to execute + * multiple executions to the rest of the chain in parallel. + * <p> + * The state of the given execution is read on construction of this and not + * used later - the argument execution can be reused for other purposes. + * + * @param execution the execution from which the state of this is created + */ + public AsyncExecution(final Execution execution) { + this.execution = new Execution(execution); + } + + /** + * Performs an async processing. Note that the given request cannot be simultaneously + * used in multiple such processings - a clone must be created for each. + */ + public FutureResponse process(final Request request) { + return getFutureResponse(new Callable<Response>() { + @Override + public Response call() { + return execution.process(request); + } + }, request); + } + + private static <T> Future<T> getFuture(final Callable<T> callable) { + final FutureTask<T> future = new FutureTask<>(callable); + executorMain.execute(future); + return future; + } + + private FutureResponse getFutureResponse(final Callable<Response> callable, final Request request) { + final FutureResponse future = new FutureResponse(callable, execution, request); + executorMain.execute(future.delegate()); + return future; + } + + /* + * Waits for all futures until the given timeout. If a FutureResult isn't + * done when the timeout expires, it will be cancelled, and it will return a + * response. All unfinished Futures will be cancelled. + * + * @return the list of responses in the same order as returned from the task collection + */ + // Note that this may also be achieved using guava Futures. Not sure if this should be deprecated because of it. + public static List<Response> waitForAll(final Collection<FutureResponse> tasks, final long timeout) { + + // Copy the list in case it is modified while we are waiting + final List<FutureResponse> workingTasks = new ArrayList<>(tasks); + @SuppressWarnings({"rawtypes", "unchecked"}) + final Future task = getFuture(new Callable() { + @Override + public List<Future> call() { + for (final FutureResponse task : workingTasks) { + task.get(); + } + return null; + } + }); + + try { + task.get(timeout, TimeUnit.MILLISECONDS); + } catch (final TimeoutException | InterruptedException | ExecutionException e) { + // Handle timeouts below + } + + final List<Response> responses = new ArrayList<>(tasks.size()); + for (final FutureResponse future : workingTasks) + responses.add(getTaskResponse(future)); + return responses; + } + + private static Response getTaskResponse(FutureResponse future) { + if (future.isDone() && !future.isCancelled()) { + return future.get(); // Since isDone() = true, this won't block. + } else { // Not done and no errors thrown + return new Response(future.getRequest(), new ErrorMessage("Timed out waiting for " + future)); + } + } + + +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/Execution.java b/processing/src/main/java/com/yahoo/processing/execution/Execution.java new file mode 100644 index 00000000000..c8bc291c036 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/Execution.java @@ -0,0 +1,487 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.collections.Pair; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.chain.ChainRegistry; +import com.yahoo.yolean.trace.TraceNode; +import com.yahoo.yolean.trace.TraceVisitor; + +import java.util.Iterator; + +/** + * An execution of a chain. This keeps tracks of the progress of the execution and is called by the + * processors (using {@link #process} to move the execution to the next one. + * + * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class Execution { + + /** + * The index of the searcher in the chain which should be executed on the next call + * An Execution instance contains the state of a chain execution by + * providing a class stack for a chain - when a processor is called + * (through this), it will increment the index of the processor to call next, + * each time a processor returns (regardless of how) it will do the opposite. + */ + private int processorIndex; + + private final Chain<? extends Processor> chain; + + private final Trace trace; + + private final Environment<? extends Processor> environment; + + /** + * Creates an execution of a single processor + * + * @param processor the processor to execute in this + * @param execution the parent execution of this + */ + public Execution(Processor processor, Execution execution) { + this(new Chain<>(processor), execution); + } + + /** Creates an execution of a single processor which is not in the context of an existing execution */ + public static Execution createRoot(Processor processor, int traceLevel, Environment<? extends Processor> environment) { + return createRoot(new Chain<>(processor), traceLevel, environment); + } + + /** + * Creates an execution which is not in the context of an existing execution + * + * @param chain the chain to execute + * @param traceLevel the level to emit trace at + * @param environment the execution environment to use + */ + public static Execution createRoot(Chain<? extends Processor> chain, int traceLevel, + Environment<? extends Processor> environment) { + return new Execution(chain, 0, Trace.createRoot(traceLevel), environment); + } + + /** + * Creates an execution of a chain + * + * @param chain the chain to execute in this, starting from the first processor + * @param execution the parent execution of this + */ + public Execution(Chain<? extends Processor> chain, Execution execution) { + this(chain, 0, execution.trace().createChild(), execution.environment().nested()); + } + + /** + * Creates an execution from another. This execution will start at the next processor of the + * given execution. The given execution can continue independently of this. + */ + public Execution(Execution startPoint) { + this(startPoint.chain, startPoint.processorIndex, startPoint.trace.createChild(), startPoint.environment().nested()); + } + + /** + * Creates a new execution by setting the internal state directly. + * + * @param chain the chain to execute + * @param startIndex the start index into that chain + * @param trace the context <b>of this</b>. If this is created from an existing execution/context, + * be sure to do <code> new Context<COMPONENT>(startPoint.context)</code> first! + * @param environment the static execution environment to use + */ + protected Execution(Chain<? extends Processor> chain, int startIndex, Trace trace, Environment<? extends Processor> environment) { + if (chain == null) throw new NullPointerException("Chain cannot be null"); + this.chain = chain; + this.processorIndex = startIndex; + this.trace = trace; + this.environment = environment; + } + + /** + * Calls process on the next processor in this chain. If there is no next, an empty response is returned. + */ + public Response process(Request request) { + Processor processor = next(); + if (processor == null) + return defaultResponse(request); + + Response response = null; + try { + nextProcessor(); + onInvoking(request, processor); + response = processor.process(request, this); + if (response == null) + throw new NullPointerException(processor + " returned null, not a Response object"); + return response; + } finally { + previousProcessor(); + onReturning(request, processor, response); + } + } + + /** + * Returns the index into the chain of processors which is currently next + */ + protected int nextIndex() { + return processorIndex; + } + + /** + * A hook called when a processor is to be invoked. Overriding methods must call super.onInvoking + */ + protected void onInvoking(Request request, Processor next) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Invoke " + next, traceAt); + } + if (Trace.Level.Dependencies.includes(trace.getTraceLevel())) + trace.trace(next.getId() + " " + next.getDependencies().toString(), trace.getTraceLevel()); + } + + /** + * A hook called when a processor returns, either normally or by throwing. + * Overriding methods must call super.onReturning + * + * @param request the processing request + * @param processor the processor which returned + * @param response the response returned, or null if the processor returned by throwing + */ + protected void onReturning(Request request, Processor processor, Response response) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Return " + processor, traceAt); + } + } + + /** Move this execution to the previous processor */ + protected void previousProcessor() { + processorIndex--; + } + + /** Move this execution to the next processor */ + protected void nextProcessor() { + processorIndex++; + } + + /** Returns the next searcher to be invoked in this chain, or null if we are at the last */ + protected Processor next() { + if (chain.components().size() <= processorIndex) return null; + return chain.components().get(processorIndex); + } + + /** Returns the chain this executes */ + public Chain<? extends Processor> chain() { + return chain; + } + + /** + * Creates the default response to return from this kind of execution when there are no further processors. + * If this is overridden, make sure to propagate any freezeListener from this to the returned response + * top-level DataList. + */ + protected Response defaultResponse(Request request) { + return new Response(request); + } + + public String toString() { + StringBuilder s = new StringBuilder("Execution("); + s.append(chain.getId()); + s.append(")#").append(hashCode()); + return s.toString(); + } + + public Trace trace() { + return trace; + } + + public Environment<? extends Processor> environment() { + return environment; + } + + /** + * Holds the static execution environment for the duration of an execution + */ + public static class Environment<COMPONENT extends Processor> { + + private final ChainRegistry<COMPONENT> chainRegistry; + + /** + * Creates an empty environment. Only useful for some limited testing + */ + public static <C extends Processor> Environment<C> createEmpty() { + return new Environment<C>(new ChainRegistry<C>()); + } + + /** + * Returns an environment for an execution spawned from the execution having this environment. + */ + public Environment<COMPONENT> nested() { + return this; // this is immutable, subclasses might want to do something else though + } + + /** + * Creates a new environment + */ + public Environment(ChainRegistry<COMPONENT> chainRegistry) { + this.chainRegistry = chainRegistry; + } + + /** + * Returns the processing chain registry of this execution environment. + * The registry may be empty, but never null. + */ + public ChainRegistry<COMPONENT> chainRegistry() { + return chainRegistry; + } + + } + + /** + * Tre trace of this execution. This is a facade into a node in the larger trace tree which captures + * the information about all executions caused by some request + * + * @author <a href="bratseth@yahoo-inc.com">Jon Bratseth</a> + */ + public static class Trace { + + /** + * The node in the trace tree capturing this execution + */ + private TraceNode traceNode; + + /** + * The highest level of tracing this should record + */ + private int traceLevel; + + /** + * If true, do timing logic, even though trace level is low. + */ + private boolean forceTimestamps = false; + + /** + * Creates an empty root trace with a given level of tracing + */ + public static Trace createRoot(int traceLevel) { + return new Trace(traceLevel, new TraceNode(null, timestamp(traceLevel, false)), false); + } + + /** + * Creates a trace node below a parent + */ + public Trace createChild() { + TraceNode child = new TraceNode(null, timestamp(traceLevel, forceTimestamps)); + traceNode.add(child); + return new Trace(getTraceLevel(), child, forceTimestamps); + } + + /** + * Creates a new instance by assigning the internal state of this directly + */ + private Trace(int traceLevel, TraceNode traceNode, boolean forceTimestamps) { + this.traceLevel = traceLevel; + this.traceNode = traceNode; + this.forceTimestamps = forceTimestamps; + } + + /** + * Returns the maximum trace level this will record + */ + public int getTraceLevel() { + return traceLevel; + } + + /** + * Sets the maximum trace level this will record + */ + public void setTraceLevel(int traceLevel) { + this.traceLevel = traceLevel; + } + + public void setForceTimestamps(boolean forceTimestamps) { + this.forceTimestamps = forceTimestamps; + } + + public boolean getForceTimestamps() { + return forceTimestamps; + } + + /** + * Adds a trace message to this trace, if this trace has at most the given trace level + */ + public void trace(String message, int traceLevel) { + if (this.traceLevel >= traceLevel) { + traceNode.add(new TraceNode(message, timestamp(traceLevel, forceTimestamps))); + } + } + + /** + * Adds a key-value which will be logged to the access log of this request. + * Multiple values may be set to the same key. A value cannot be removed once set, + * but it can be overwritten by adding another value for the same key. + */ + public void logValue(String key, String value) { + traceNode.add(new TraceNode(new LogValue(key, value), 0)); + } + + /** + * Returns the values that should be written to the access log set in the entire trace node tree + */ + public Iterator<LogValue> logValueIterator() { + return traceNode.root().descendants(LogValue.class).iterator(); + } + + /** + * Visits the entire trace tree + * + * @return the argument visitor for convenience + */ + public <VISITOR extends TraceVisitor> VISITOR accept(VISITOR visitor) { + return traceNode.root().accept(visitor); + } + + /** + * Adds a property key-value to this trace. + * Values are looked up by reverse depth-first search in the trace node tree. + * + * @param name the name of the property + * @param value the value of the property, or null to set this property to null + */ + public void setProperty(String name, Object value) { + traceNode.add(new TraceNode(new Pair<>(name, value), 0)); + } + + /** + * Returns a property set anywhere in the trace tree this points to. + * Note that even though this call is itself "thread robust", the object values returned + * may in some scenarios not be written behind a synchronization barrier, so when accessing + * objects which are not inherently thread safe, synchronization should be considered. + * <p> + * This method have a time complexity which is proportional to + * the number of trace nodes in the tree + * + * @return the value of this property, or null if none + */ + public Object getProperty(String name) { + return accept(new PropertyValueVisitor(name)).foundValue(); + } + + /** + * Returns the trace node peer of this + */ + public TraceNode traceNode() { + return traceNode; + } + + /** + * Returns a short string description of this + */ + @Override + public String toString() { + return "trace: " + traceNode; + } + + private static long timestamp(int traceLevel, boolean forceTimestamps) { + return (forceTimestamps || Level.Timestamp.includes(traceLevel)) ? System.currentTimeMillis() : 0; + } + + /** + * Visits all trace nodes to collect the last set value of a particular property in a trace tree + */ + private static class PropertyValueVisitor extends TraceVisitor { + + /** + * The name of the property to find + */ + private String name; + private Object foundValue = null; + + public PropertyValueVisitor(String name) { + this.name = name; + } + + @Override + public void visit(TraceNode node) { + if (node.payload() == null) return; + if (!(node.payload() instanceof Pair)) return; + + Pair property = (Pair) node.payload(); + if (!property.getFirst().equals(name)) return; + foundValue = property.getSecond(); + } + + public Object foundValue() { + return foundValue; + } + + } + + /** + * An immutable access log value added to the trace + */ + public static class LogValue { + + private final String key; + private final String value; + + public LogValue(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return key + "=" + value; + } + + } + + /** + * Defines what information is added at which trace level + */ + public enum Level { + + /** + * Every processing step initiated is traced + */ + Step(4), + /** + * All trace messages are timestamped + */ + Timestamp(6), + /** + * The before/after dependencies of each processing step is traced on every invocation + */ + Dependencies(7); + + /** + * The smallest trace level at which this information will be traced + */ + private int value; + + Level(int value) { + this.value = value; + } + + public int value() { + return value; + } + + /** + * Returns whether this level includes the given level, i.e whether traceLevel is this.value() or more + */ + public boolean includes(int traceLevel) { + return traceLevel >= this.value(); + } + + } + } +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java b/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java new file mode 100644 index 00000000000..ceb5d4f7ccc --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An execution which has a response which is returned when this gets to the end of the chain. + * This is useful to run processing chains where a response exists up front, typically for on completion listeners. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ExecutionWithResponse extends Execution { + + private Response response; + + /** + * Creates an execution which will return a given response at the end of the chain. + * + * @param chain the chain to execute in this + * @param response the response this will return from {@link #process} then the end of this chain is reached + * @param execution the the parent of this execution + */ + public ExecutionWithResponse(Chain<? extends Processor> chain, Response response, Execution execution) { + super(chain, execution); + this.response = response; + } + + @Override + protected Response defaultResponse(Request request) { + return response; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java b/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java new file mode 100644 index 00000000000..fbc6d0fb6d8 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java @@ -0,0 +1,17 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Response; + +/** + * An interface for classes which can be given responses. + * Freeze listeners may implement this to be handed the response + * before they are run. There is probably no other sensible use for this. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public interface ResponseReceiver { + + public void setResponse(Response response); + +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java b/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java new file mode 100644 index 00000000000..617a2b98b03 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/RunnableExecution.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An adaptor of an Execution to a runnable. Calling run on this causes process to be called on the + * given processor. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class RunnableExecution implements Runnable { + + private final Request request; + private final Execution execution; + private Response response = null; + private Throwable exception = null; + + public RunnableExecution(Request request, Execution execution) { + this.request = request; + this.execution = execution; + } + + /** + * Calls process on the execution of this. + * This will result in either response or exception being set on this. + * Calling this never throws an exception. + */ + public void run() { + try { + response = execution.process(request); + } catch (Exception e) { + exception = e; // TODO: Log + } + } + + /** + * Returns the response from executing this, or null if exception is set or run() has not been called yet + */ + public Response getResponse() { + return response; + } + + /** + * Returns the exception from executing this, or null if response is set or run() has not been called yet + */ + public Throwable getException() { + return exception; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java b/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java new file mode 100644 index 00000000000..e31b9a8b098 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.chain; + +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.component.provider.ComponentRegistry; + +/** + * A registry of chains + * + * @author tonytv + * @since 5.1.7 + */ +public class ChainRegistry<T extends ChainedComponent> extends ComponentRegistry<Chain<T>> { +} diff --git a/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java b/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java new file mode 100644 index 00000000000..e2f299fa186 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/chain/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution.chain; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/execution/package-info.java b/processing/src/main/java/com/yahoo/processing/execution/package-info.java new file mode 100644 index 00000000000..c3a3074bfb4 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/execution/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/package-info.java b/processing/src/main/java/com/yahoo/processing/package-info.java new file mode 100644 index 00000000000..4f0b2d4e724 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/package-info.java @@ -0,0 +1,10 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing; + +// TODO: +// - Look through all instances where we pass executor and consider if we should allow the caller to decide the thread +// - Should data listener use a typed interface rather than runnable` + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/request/CompoundName.java b/processing/src/main/java/com/yahoo/processing/request/CompoundName.java new file mode 100644 index 00000000000..798d2d11f89 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/CompoundName.java @@ -0,0 +1,287 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static com.yahoo.text.Lowercase.toLowerCase; + +/** + * An immutable compound name of the general form "a.bb.ccc", + * where there can be any number of such compounds, including one or zero. + * <p> + * Using CompoundName is generally substantially faster than using strings. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public final class CompoundName { + + /** + * The string name of this compound. + */ + private final String name; + private final String lowerCasedName; + + private final ImmutableList<String> compounds; + + /** A hashcode which is always derived from the compounds (NEVER the string) */ + private final int hashCode; + + /** This name with the first component removed */ + private final CompoundName rest; + + /** The empty compound */ + public static final CompoundName empty = new CompoundName(""); + + /** + * Constructs this from a string which may contains dot-separated components + * + * @throws NullPointerException if name is null + */ + public CompoundName(final String name) { + this(name, parse(name)); + } + + /** Constructs this from an array of name components which are assumed not to contain dots */ + public static CompoundName fromComponents(String ... components) { + return new CompoundName(Arrays.asList(components)); + } + + /** Constructs this from a list of compounds. */ + public CompoundName(List<String> compounds) { + this(toCompoundString(compounds), compounds); + } + + /** + * Constructs this from a name with already parsed compounds. + * Private to avoid creating names with inconsistencies. + * + * @param name the string representation of the compounds + * @param compounds the compounds of this name + */ + private CompoundName(String name, List<String> compounds) { + if (name == null) throw new NullPointerException("Name can not be null"); + + this.name = name; + this.lowerCasedName = toLowerCase(name); + if (compounds.size()==1 && compounds.get(0).isEmpty()) + this.compounds = ImmutableList.of(); + else + this.compounds = ImmutableList.copyOf(compounds); + this.hashCode = this.compounds.hashCode(); + + int size = this.compounds.size(); + rest = size > 1 ? new CompoundName(compounds.subList(1, size)) + : size == 1 ? empty : this; // size==0 -> this needed during construction of empty + } + + private static List<String> parse(String s) { + ArrayList<String> l = new ArrayList<>(); + int p = 0; + final int m = s.length(); + for (int i = 0; i < m; i++) { + if (s.charAt(i) == '.') { + l.add(s.substring(p, i)); + p = i + 1; + } + } + if (p == 0) { + l.add(s); + } else if (p < m) { + l.add(s.substring(p, m)); + } else { + throw new IllegalArgumentException("'" + s + "' is not a legal compound name. Names can not end with a dot."); + } + return l; + } + + /** + * Returns a compound name which has the given compound string appended to it + * + * @param name if name is empty this returns <code>this</code> + */ + public CompoundName append(String name) { + if (name.isEmpty()) return this; + if (isEmpty()) return new CompoundName(name); + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.addAll(parse(name)); + return new CompoundName(concat(this.name, name), newCompounds); + } + + /** + * Returns a compound name which has the given compounds appended to it + * + * @param name if name is empty this returns <code>this</code> + */ + public CompoundName append(CompoundName name) { + if (name.isEmpty()) return this; + if (isEmpty()) return name; + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.addAll(name.compounds); + return new CompoundName(concat(this.name, name.name), newCompounds); + } + + private String concat(String name1, String name2) { + return name1 + "." + name2; + } + + /** + * Returns a compound name which has the given name components prepended to this name, + * in the given order, i.e new ComponentName("c").prepend("a","b") will yield "a.b.c". + * + * @param nameParts if name is empty this returns <code>this</code> + */ + public CompoundName prepend(String ... nameParts) { + if (nameParts.length == 0) return this; + if (isEmpty()) return fromComponents(nameParts); + + List<String> newCompounds = new ArrayList<>(); + for (String namePart : nameParts) + newCompounds.add(namePart); + newCompounds.addAll(this.compounds); + return new CompoundName(newCompounds); + } + + /** + * Returns the name after the last dot. If there are no dots, the full name is returned. + */ + public String last() { + if (compounds.isEmpty()) return ""; + return compounds.get(compounds.size() - 1); + } + + /** + * Returns the name before the first dot. If there are no dots the full name is returned. + */ + public String first() { + if (compounds.isEmpty()) return ""; + return compounds.get(0); + } + + /** + * Returns the first n components of this. + * + * @throws IllegalArgumentException if this does not have at least n components + */ + public CompoundName first(int n) { + if (compounds.size() < n) + throw new IllegalArgumentException("Asked for the first " + n + " components but '" + + this + "' only have " + compounds.size() + " components."); + return new CompoundName(compounds.subList(0, n)); + } + + /** + * Returns the name after the first dot, or "" if this name has no dots + */ + public CompoundName rest() { return rest; } + + /** + * Returns the name starting after the n first components (i.e dots). + * This may be the empty name. + * + * @throws IllegalArgumentException if this does not have at least that many components + */ + public CompoundName rest(int n) { + if (n == 0) return this; + if (compounds.size() < n) + throw new IllegalArgumentException("Asked for the rest after " + n + " components but '" + + this + "' only have " + compounds.size() + " components."); + if (n == 1) return rest(); + if (compounds.size() == n) return empty; + return rest.rest(n-1); + } + + /** + * Returns the number of compound elements in this. Which is exactly the number of dots in the string plus one. + * The size of an empty compound is 0. + */ + public int size() { + return compounds.size(); + } + + /** + * Returns the compound element as the given index + */ + public String get(int i) { + return compounds.get(i); + } + + /** + * Returns a compound which have the name component at index i set to the given name. + * As an optimization, if the given name == the name component at this index, this is returned. + */ + public CompoundName set(int i, String name) { + if (get(i) == name) return this; + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.set(i, name); + return new CompoundName(newCompounds); + } + + /** + * Returns whether this name has more than one element + */ + public boolean isCompound() { + return compounds.size() > 1; + } + + public boolean isEmpty() { + return compounds.isEmpty(); + } + + /** + * Returns whether the given name is a prefix of this. + * Prefixes are taken on the component, not character level, so + * "a" is a prefix of "a.b", but not a prefix of "ax.b + */ + public boolean hasPrefix(CompoundName prefix) { + if (prefix.size() > this.size()) return false; + + int prefixLength = prefix.name.length(); + if (prefixLength == 0) + return true; + + if (name.length() > prefixLength && name.charAt(prefixLength) != '.') + return false; + + return name.startsWith(prefix.name); + } + + /** + * Returns an immutable list of the components of this + */ + public List<String> asList() { + return compounds; + } + + @Override + public int hashCode() { return hashCode; } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof CompoundName)) return false; + CompoundName other = (CompoundName)o; + return this.name.equals(other.name); + } + + /** + * Returns the string representation of this - all the name components in order separated by dots. + */ + @Override + public String toString() { return name; } + + public String getLowerCasedName() { + return lowerCasedName; + } + + private static String toCompoundString(List<String> compounds) { + StringBuilder b = new StringBuilder(); + for (String compound : compounds) + b.append(compound).append("."); + return b.length()==0 ? "" : b.substring(0, b.length()-1); + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java b/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java new file mode 100644 index 00000000000..15ef23b2d62 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/ErrorMessage.java @@ -0,0 +1,217 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +/** + * An error encountered while processing a request. + * This can be subclassed to add error messages containing more information. + * <p> + * Error messages are immutable. + * + * @author bratseth + */ +public class ErrorMessage implements Cloneable { + + private final int code; + private final String message; + private final String detailedMessage; + private final Throwable cause; + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + */ + public ErrorMessage(String message) { + this(0, message, null, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + */ + public ErrorMessage(int code, String message) { + this(code, message, null, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + */ + public ErrorMessage(String message, String details) { + this(0, message, details, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param details a longer detail description of this condition + */ + public ErrorMessage(int code, String message, String details) { + this(code, message, details, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param cause the cause of this error + */ + public ErrorMessage(String message, Throwable cause) { + this(0, message, null, cause); + } + + /** + * Creates an error + * + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param message the textual message describing this condition tersely + * @param cause the cause of this error + */ + public ErrorMessage(int code, String message, Throwable cause) { + this(code, message, null, cause); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + * @param cause the cause of this error + */ + public ErrorMessage(String message, String details, Throwable cause) { + this(0, message, details, cause); + } + + /** + * Creates an error + * + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + * @param cause the cause of this error + */ + public ErrorMessage(int code, String message, String details, Throwable cause) { + if (message == null) throw new NullPointerException("Message cannot be null"); + this.code = code; + this.message = message; + this.detailedMessage = details; + this.cause = cause; + } + + /** + * Returns the code of this message, or 0 if no code is set + */ + public int getCode() { + return code; + } + + /** + * Returns the error message, never null + */ + public String getMessage() { + return message; + } + + /** + * Returns detailed information about this error, or null if there is no detailed message + */ + public String getDetailedMessage() { + return detailedMessage; + } + + /** + * Returns the throwable associated with this error, or null if none + */ + public Throwable getCause() { + return cause; + } + + /** + * Returns a formatted message containing the information in this + */ + @Override + public String toString() { + if (code == 0 && detailedMessage == null && cause == null) return message; // shortcut + StringBuilder b = new StringBuilder(); + if (code != 0) + b.append(code).append(": "); + b.append(message); + if (detailedMessage != null) + b.append(": ").append(detailedMessage); + if (cause != null) + append(cause, b); + return b.toString(); + } + + private void append(Throwable t, StringBuilder b) { + String lastMessage = null; + String message; + for (; t != null; t = t.getCause(), lastMessage = message) { + message = getMessage(t); + if (message == null) continue; + if (lastMessage != null && lastMessage.equals(message)) continue; + if (b.length() > 0) + b.append(": "); + b.append(message); + } + } + + /** + * Returns a useful message from *this* exception, or null if none + */ + private static String getMessage(Throwable t) { + String message = t.getMessage(); + if (t.getCause() == null) { + if (message == null) return t.getClass().getSimpleName(); + } else { + if (message == null) return null; + if (message.equals(t.getCause().getClass().getName() + ": " + t.getCause().getMessage())) return null; + } + return message; + } + + @Override + public int hashCode() { + return code * 7 + message.hashCode() + (detailedMessage == null ? 0 : 17 * detailedMessage.hashCode()); + } + + /** + * Two error messages are equal if they have the same code and message. + * The cause is ignored in the comparison. + */ + @Override + public boolean equals(Object o) { + if (!(o instanceof ErrorMessage)) return false; + + ErrorMessage other = (ErrorMessage) o; + + if (this.code != other.code) return false; + + if (!this.message.equals(other.message)) return false; + + if (this.detailedMessage == null) return other.detailedMessage == null; + if (other.detailedMessage == null) return false; + + return this.detailedMessage.equals(other.detailedMessage); + } + + @Override + public ErrorMessage clone() { + try { + return (ErrorMessage) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Programming error"); + } + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/request/Properties.java b/processing/src/main/java/com/yahoo/processing/request/Properties.java new file mode 100644 index 00000000000..2c603b6c7a0 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/Properties.java @@ -0,0 +1,573 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * The properties of a request + * + * @author bratseth + */ +public class Properties implements Cloneable { + + private Properties chained = null; + + /** + * Sets the properties chained to this. + * + * @param chained the properties to chain to this, or null to make this the last in the chain + * @return the given chained object to allow setting up a chain by dotting in one statement + */ + public Properties chain(Properties chained) { + this.chained = chained; + return chained; + } + + /** + * Returns the properties chained to this, or null if this is the last in the chain + */ + public Properties chained() { + return chained; + } + + /** + * Returns the first instance of the given class in this chain, or null if none + */ + @SuppressWarnings("unchecked") + public final <T extends Properties> T getInstance(Class<T> propertyClass) { + if (propertyClass.isAssignableFrom(this.getClass())) return (T) this; + if (chained == null) return null; + return chained.getInstance(propertyClass); + } + + /** + * Lists all properties of this with no context, by delegating to listProperties("") + */ + public final Map<String, Object> listProperties() { + return listProperties(CompoundName.empty); + } + + /** + * Returns a snapshot of all properties of this - same as listProperties("",context) + */ + public final Map<String, Object> listProperties(Map<String, String> context) { + return listProperties(CompoundName.empty, context, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(CompoundName path) { + return listProperties(path, null, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(String path) { + return listProperties(new CompoundName(path), null, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(CompoundName path, Map<String, String> context) { + return listProperties(path, context, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(String path, Map<String, String> context) { + return listProperties(new CompoundName(path), context, this); + } + + /** + * Returns a snapshot of all properties of this having a given path prefix + * <p> + * Some sources of properties may not be list-able (e.g those using reflection) + * and will not be included in this snapshot. + * + * + * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties + * @param context the context used to resolve the properties, or null if none + * @param substitution the properties which will be used to do string substitution in the values added to the map + */ + public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) { + if (path == null) + path = CompoundName.empty; + if (chained() == null) + return new HashMap<>(); + else + return chained().listProperties(path, context, this); + } + + /** + * Returns a snapshot of all properties of this having a given path prefix + * <p> + * Some sources of properties may not be list-able (e.g those using reflection) + * and will not be included in this snapshot. + * + * + * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties + * @param context the context used to resolve the properties, or null if none + * @param substitution the properties which will be used to do string substitution in the values added to the map + */ + public final Map<String, Object> listProperties(String path, Map<String, String> context, Properties substitution) { + return listProperties(new CompoundName(path), context, substitution); + } + + /** + * Gets a named value which (if necessary) is resolved using a property context. + * + * @param name the name of the property to return + * @param context the variant resolution context, or null if none + * @param substitution the properties used to substitute in these properties, or null if none + */ + public Object get(CompoundName name, Map<String, String> context, Properties substitution) { + if (chained == null) return null; + return chained.get(name, context, substitution); + } + + /** + * Gets a named value which (if necessary) is resolved using a property context + * + * @param name the name of the property to return + * @param context the variant resolution context, or null if none + * @param substitution the properties used to substitute in these properties, or null if none + */ + public final Object get(String name, Map<String, String> context, Properties substitution) { + return get(new CompoundName(name), context, substitution); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,context,this) + */ + public final Object get(CompoundName name, Map<String, String> context) { + return get(name, context, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,context,this) + */ + public final Object get(String name, Map<String, String> context) { + return get(new CompoundName(name), context, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,null,this) + */ + public final Object get(CompoundName name) { + return get(name, null, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,null,this) + */ + public final Object get(String name) { + return get(new CompoundName(name), null, this); + } + + /** + * Gets a named value from the first chained instance which has one, + * or the default value if no value is set, or if the first value encountered is explicitly set to null. + * <p> + * This default implementation simply forwards to the chained instance, or returns the default if none + * + * + * @param name the name of the property to return + * @param defaultValue the default value returned if the value returned is null + */ + public final Object get(CompoundName name, Object defaultValue) { + Object value = get(name); + if (value == null) return defaultValue; + return value; + } + + /** + * Gets a named value from the first chained instance which has one, + * or the default value if no value is set, or if the first value encountered is explicitly set to null. + * <p> + * This default implementation simply forwards to the chained instance, or returns the default if none + * + * @param name the name of the property to return + * @param defaultValue the default value returned if the value returned is null + */ + public final Object get(String name, Object defaultValue) { + return get(new CompoundName(name), defaultValue); + } + + /** + * Sets a value to the first chained instance which accepts it. + * <p> + * This default implementation forwards to the chained instance or throws + * a RuntimeException if there is not chained instance. + * + * @param name the name of the value + * @param value the value to set. Setting a name to null explicitly is legal. + * @param context the context used to resolve where the values should be set, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public void set(CompoundName name, Object value, Map<String, String> context) { + if (chained == null) throw new RuntimeException("Property '" + name + "->" + value + + "' was not accepted in this property chain"); + chained.set(name, value, context); + } + + /** + * Sets a value to the first chained instance which accepts it. + * <p> + * This default implementation forwards to the chained instance or throws + * a RuntimeException if there is not chained instance. + * + * @param name the name of the value + * @param value the value to set. Setting a name to null explicitly is legal. + * @param context the context used to resolve where the values should be set, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(String name, Object value, Map<String, String> context) { + set(new CompoundName(name), value, context); + } + + /** + * Sets a value to the first chained instance which accepts it by calling set(name,value,null). + * + * @param name the name of the value + * @param value the value to set. Setting a name to null explicitly is legal. + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(CompoundName name, Object value) { + set(name, value, null); + } + + /** + * Sets a value to the first chained instance which accepts it by calling set(name,value,null). + * + * @param name the name of the value + * @param value the value to set. Setting a name to null explicitly is legal. + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(String name, Object value) { + set(new CompoundName(name), value, Collections.<String,String>emptyMap()); + } + + /** + * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return + * the value. Returns false if this property is null. + */ + public final boolean getBoolean(CompoundName name) { + return getBoolean(name, false); + } + + /** + * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return + * the value. Returns false if this property is null. + */ + public final boolean getBoolean(String name) { + return getBoolean(name, false); + } + + /** + * Gets a property as a boolean. + * This will return true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + * + * @param defaultValue the value to return if this property is null + */ + public final boolean getBoolean(CompoundName key, boolean defaultValue) { + return asBoolean(get(key), defaultValue); + } + + /** + * Gets a property as a boolean. + * This will return true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + * + * @param defaultValue the value to return if this property is null + */ + public final boolean getBoolean(String key, boolean defaultValue) { + return asBoolean(get(key), defaultValue); + } + + /** + * Converts a value to boolean - this will be true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + */ + protected final boolean asBoolean(Object value, boolean defaultValue) { + if (value == null) return defaultValue; + + String s = value.toString(); + int sz = s.length(); + switch (sz) { + case 0: + return true; + case 4: + return ((s.charAt(0) | 0x20) == 't') && + ((s.charAt(1) | 0x20) == 'r') && + ((s.charAt(2) | 0x20) == 'u') && + ((s.charAt(3) | 0x20) == 'e'); + } + return false; + } + + /** + * Returns this property as a string + * + * @return this property as a string, or null if the property is null + */ + public final String getString(CompoundName key) { + return getString(key, null); + } + + /** + * Returns this property as a string + * + * @return this property as a string, or null if the property is null + */ + public final String getString(String key) { + return getString(key, null); + } + + /** + * Returns this property as a string + * + * @param key the property key + * @param defaultValue the value to return if this property is null + * @return this property as a string + */ + public final String getString(CompoundName key, String defaultValue) { + return asString(get(key), defaultValue); + } + + /** + * Returns this property as a string + * + * @param key the property key + * @param defaultValue the value to return if this property is null + * @return this property as a string + */ + public final String getString(String key, String defaultValue) { + return asString(get(key), defaultValue); + } + + protected final String asString(Object value, String defaultValue) { + if (value == null) return defaultValue; + return value.toString(); + } + + /** + * Returns a property as an Integer + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but + * have a toString which is not parseable as a number + */ + public final Integer getInteger(CompoundName name) { + return getInteger(name, null); + } + + /** + * Returns a property as an Integer + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but + * have a toString which is not parseable as a number + */ + public final Integer getInteger(String name) { + return getInteger(name, null); + } + + /** + * Returns a property as an Integer + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for the name + * @throws NumberFormatException if the given parameter does not exist + * or does not have a toString parseable as a number + */ + public final Integer getInteger(CompoundName name, Integer defaultValue) { + return asInteger(get(name), defaultValue); + } + + /** + * Returns a property as an Integer + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for the name + * @throws NumberFormatException if the given parameter does not exist + * or does not have a toString parseable as a number + */ + public final Integer getInteger(String name, Integer defaultValue) { + return asInteger(get(name), defaultValue); + } + + protected final Integer asInteger(Object value, Integer defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Integer) + return (Integer) value; + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return new Integer(stringValue); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("Not a valid integer"); + } + } + + /** + * Returns a property as a Long + * + * @return the long value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(CompoundName name) { + return getLong(name, null); + } + + /** + * Returns a property as a Long + * + * @return the long value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(String name) { + return getLong(name, null); + } + + /** + * Returns a property as a Long + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(CompoundName name, Long defaultValue) { + return asLong(get(name), defaultValue); + } + + /** + * Returns a property as a Long + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(String name, Long defaultValue) { + return asLong(get(name), defaultValue); + } + + protected final Long asLong(Object value, Long defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Long) + return (Long) value; + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return new Long(value.toString()); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("Not a valid long"); + } + } + + /** + * Returns a property as a Double + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(CompoundName name) { + return getDouble(name, null); + } + + /** + * Returns a property as a Double + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(String name) { + return getDouble(name, null); + } + + /** + * Returns a property as a Double + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(CompoundName name, Double defaultValue) { + return asDouble(get(name), defaultValue); + } + + /** + * Returns a property as a Double + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(String name, Double defaultValue) { + return asDouble(get(name), defaultValue); + } + + protected final Double asDouble(Object value, Double defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Double) + return (Double) value; + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return new Double(value.toString()); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("Not a valid double"); + } + } + + /** + * Clones this instance and recursively all chained instance. + * Implementations should call this and clone their own state as appropriate + */ + public Properties clone() { + try { + Properties clone = (Properties) super.clone(); + if (chained != null) + clone.chained = this.chained.clone(); + return clone; + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Will never happen"); + } + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/request/package-info.java b/processing/src/main/java/com/yahoo/processing/request/package-info.java new file mode 100644 index 00000000000..65b467c588c --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.request; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java b/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java new file mode 100644 index 00000000000..9a1441196f3 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java @@ -0,0 +1,146 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.properties; + +import com.yahoo.component.provider.FreezableClass; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.Properties; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.logging.Logger; + +/** + * A HashMap backing of Properties. + * <p> + * When this is cloned it will deep copy not only the model object map, but also each + * clonable member inside the map. + * <p> + * Subclassing is supported, a hook can be implemented to provide conditional inclusion in the map. + * By default - all properties are accepted, so set is never propagated. + * <p> + * This class is not multithread safe. + * + * @author bratseth + */ +public class PropertyMap extends Properties { + + private static Logger log = Logger.getLogger(PropertyMap.class.getName()); + + /** + * The properties of this + */ + private Map<CompoundName, Object> properties = new HashMap<>(); + + public void set(CompoundName name, Object value, Map<String, String> context) { + if (shouldSet(name, value)) + properties.put(name, value); + else + super.set(name, value, context); + } + + /** + * Return true if this value should be set in this map, false if the set should be propagated instead + * This default implementation always returns true. + */ + protected boolean shouldSet(CompoundName name, Object value) { + return true; + } + + public + @Override + Object get(CompoundName name, Map<String, String> context, + com.yahoo.processing.request.Properties substitution) { + if (!properties.containsKey(name)) return super.get(name, context, substitution); + return properties.get(name); + } + + public + @Override + PropertyMap clone() { + PropertyMap clone = (PropertyMap) super.clone(); + clone.properties = cloneMap(this.properties); + return clone; + } + + /** + * Clones a map by deep cloning each value which is cloneable and shallow copying all other values. + */ + public static Map<CompoundName, Object> cloneMap(Map<CompoundName, Object> map) { + Map<CompoundName, Object> cloneMap = new HashMap<>(); + for (Map.Entry<CompoundName, Object> entry : map.entrySet()) { + Object cloneValue = clone(entry.getValue()); + if (cloneValue == null) + cloneValue = entry.getValue(); // Shallow copy objects which does not support cloning + cloneMap.put(entry.getKey(), cloneValue); + } + return cloneMap; + } + + /** + * Clones this object if it is clonable, and the clone is public. Returns null if not + */ + public static Object clone(Object object) { + if (object == null) return null; + if (!(object instanceof Cloneable)) return null; + if (object instanceof Object[]) + return arrayClone((Object[]) object); + else + return objectClone(object); + } + + private static Object arrayClone(Object[] object) { + Object[] arrayClone = Arrays.copyOf(object, object.length); + // deep clone + for (int i = 0; i < arrayClone.length; i++) { + Object elementClone = clone(arrayClone[i]); + if (elementClone != null) + arrayClone[i] = elementClone; + } + return arrayClone; + } + + private static Object objectClone(Object object) { + // Fastpath for our own commonly used classes + if (object instanceof FreezableClass) { + // List common superclass of 'com.yahoo.search.result.Hit' + return ((FreezableClass) object).clone(); + } + else if (object instanceof PublicCloneable) { + return ((PublicCloneable)object).clone(); + } + else if (object instanceof LinkedList) { // TODO: Why? Somebody's infatuation with LinkedList knows no limits + return ((LinkedList) object).clone(); + } + + try { + Method cloneMethod = object.getClass().getMethod("clone"); + return cloneMethod.invoke(object); + } catch (NoSuchMethodException e) { + log.warning("'" + object + "' is Cloneable, but has no clone method - will use the same instance in all requests"); + return null; + } catch (IllegalAccessException e) { + log.warning("'" + object + "' is Cloneable, but clone method cannot be accessed - will use the same instance in all requests"); + return null; + } catch (InvocationTargetException e) { + throw new RuntimeException("Exception cloning '" + object + "'", e); + } + } + + @Override + public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) { + Map<String, Object> map = super.listProperties(path, context, substitution); + + for (Map.Entry<CompoundName, Object> entry : properties.entrySet()) { + if ( ! entry.getKey().hasPrefix(path)) continue; + CompoundName propertyName = entry.getKey().rest(path.size()); + if (propertyName.isEmpty()) continue; + map.put(propertyName.toString(), entry.getValue()); + } + return map; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java b/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java new file mode 100644 index 00000000000..f15b1a93334 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.properties; + +/** + * This interface publicly exposes the clone method. + * Classes which are used in request properties may implement this to allow faster cloning of the request. + * + * @author bratseth + * @since 5.66 + */ +public interface PublicCloneable<T> extends Cloneable { + + public T clone(); + +} diff --git a/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java b/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java new file mode 100644 index 00000000000..c0b5ccad151 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/request/properties/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.request.properties; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/response/AbstractData.java b/processing/src/main/java/com/yahoo/processing/response/AbstractData.java new file mode 100644 index 00000000000..3595315bd62 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/AbstractData.java @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.processing.Request; + +/** + * Convenience superclass for implementations of data. This contains no payload. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public abstract class AbstractData extends ListenableFreezableClass implements Data { + + private Request request; + + /** + * Creates some data marked with the request that created it + */ + public AbstractData(Request request) { + this.request = request; + } + + /** + * Returns the request that created this data + */ + public Request request() { + return request; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java new file mode 100644 index 00000000000..845ab041d73 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/AbstractDataList.java @@ -0,0 +1,161 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.processing.Request; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A convenience superclass for dataList implementations which handles references to the request and to incoming data. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public abstract class AbstractDataList<DATATYPE extends Data> extends ListenableFreezableClass implements DataList<DATATYPE>, Streamed, Ordered { + + private final boolean ordered; + private final boolean streamed; + + /** + * The request which caused this to be created + */ + private final Request request; + + /** + * The recipient of incoming data to this. Never null, but may be a null recipient. + */ + private final IncomingData<DATATYPE> incomingData; + + private final ListenableFuture<DataList<DATATYPE>> completedFuture; + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + protected AbstractDataList(Request request) { + // Cannot call the constructor below because this must be given out below + this.request = request; + this.incomingData = new IncomingData.NullIncomingData<>(this); + this.completedFuture = new DrainOnGetFuture<>(this); + ordered = true; + streamed = true; + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData) { + this(request, incomingData, true, true); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) { + if (request == null) throw new NullPointerException("Request cannot be null"); + if (incomingData == null) throw new NullPointerException("incomingData cannot be null"); + + this.request = request; + this.incomingData = incomingData; + this.completedFuture = new DrainOnGetFuture<>(this); + this.ordered = ordered; + this.streamed = streamed; + } + + /** + * Returns the request which created this data + */ + public Request request() { + return request; + } + + /** + * Returns the holder of incoming data to this. + * This may be used to add, consume, wait for and be notified about incoming data. + * If this instance does not support late incoming data, the read methods of the return object behaves + * as expected and is synchronization free. The write methods throws an exception. + */ + public IncomingData<DATATYPE> incoming() { + return incomingData; + } + + public ListenableFuture<DataList<DATATYPE>> complete() { + return completedFuture; + } + + @Override + public boolean isOrdered() { return ordered; } + + @Override + public boolean isStreamed() { return streamed; } + + public String toString() { + return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]"); + } + + public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + + private final DataList<DATATYPE> owner; + + public DrainOnGetFuture(DataList<DATATYPE> owner) { + this.owner = owner; + } + + /** + * Returns false as this is not cancellable + */ + @Override + public boolean cancel(boolean b) { + return false; + } + + /** + * Returns false as this is not cancellable + */ + @Override + public boolean isCancelled() { + return false; + } + + /** + * Wait until all data is available. When this returns all data is available in the returned data list. + */ + @Override + public DataList<DATATYPE> get() throws InterruptedException, ExecutionException { + return drain(owner.incoming().completed().get()); + } + + /** + * Wait until all data is available. + * When and if this returns normally all data is available in the returned data list + */ + @Override + public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + return drain(owner.incoming().completed().get(timeout, timeUnit)); + } + + private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) { + for (DATATYPE item : dataList.incoming().drain()) + dataList.add(item); + set(dataList); // Signal completion to listeners + return dataList; + } + + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java b/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java new file mode 100644 index 00000000000..d30d1848c7f --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/ArrayDataList.java @@ -0,0 +1,130 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.collections.FreezableArrayList; +import com.yahoo.processing.Request; + +import java.util.List; + +/** + * A data list backed by an array. + * This implementation supports subclassing. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ArrayDataList<DATATYPE extends Data> extends AbstractDataList<DATATYPE> { + + private final FreezableArrayList<DATATYPE> dataList = new FreezableArrayList<>(true); + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + protected ArrayDataList(Request request) { + super(request); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData) { + this(request, incomingData, true, true); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) { + super(request, incomingData, ordered, streamed); + } + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> create(Request request) { + return new ArrayDataList<>(request); + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData) + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsync(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData); + incomingData.assignOwner(dataList); + return dataList; + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData), + * and where this data can be rendered in any order. + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncUnordered(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, false, true); + incomingData.assignOwner(dataList); + return dataList; + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData) + * and where this data cannot be returned to clients until this is completed. + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncNonstreamed(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, true, false); + incomingData.assignOwner(dataList); + return dataList; + } + + public DATATYPE add(DATATYPE data) { + dataList.add(data); + return data; + } + + /** + * Returns the data element at index + */ + public DATATYPE get(int index) { + return dataList.get(index); + } + + /** + * Returns a reference to the list backing this. The list may be modified freely, + * unless this is frozen. If frozen, the only permissible write operations are those that + * add new items to the end of the list. + */ + public List<DATATYPE> asList() { + return dataList; + } + + @Override + public void addDataListener(Runnable runnable) { + dataList.addListener(runnable); + } + + /** + * Irreversibly prevent further changes to the items of this. + * This allows the processing engine to start streaming the current content of this list back to the + * client (if applicable). + * <p> + * Adding new items to the end of this list is permitted even after freeze. + * If frozen, those items may be streamed back to the client immediately on add. + * <p> + * Calling this on a frozen list has no effect. + */ + public void freeze() { + super.freeze(); + dataList.freeze(); + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/Data.java b/processing/src/main/java/com/yahoo/processing/response/Data.java new file mode 100644 index 00000000000..31207210ef2 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/Data.java @@ -0,0 +1,22 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.component.provider.ListenableFreezable; +import com.yahoo.processing.Request; + +/** + * A data item created due to a processing request. + * <p> + * If a data item is <i>frozen</i> it is illegal to make further changes to its payload or referenced request. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +// TODO: Have DataList implement this instead, probably (should be a safe change in practise) +public interface Data extends ListenableFreezable { + + /** + * Returns the request that created this data + */ + public Request request(); + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/DataList.java b/processing/src/main/java/com/yahoo/processing/response/DataList.java new file mode 100644 index 00000000000..42cb1f98a70 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/DataList.java @@ -0,0 +1,85 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; +import java.util.concurrent.Executor; + +/** + * A list of data items created due to a processing request. + * This list is itself a data item, allowing data items to be organized into a composite tree. + * <p> + * A data list can be frozen even though its child data items are not. + * When a datalist is frozen the only permissible write operation is to add new items + * to the end of the list. + * <p> + * Content in a frozen list may be returned to the requesting client immediately by the underlying engine, + * even if the Response owning the list is not returned yet. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public interface DataList<DATATYPE extends Data> extends Data { + + /** + * Adds a child data item to this. + * + * @param data the data to add to this + * @return the input data instance, for chaining + */ + public DATATYPE add(DATATYPE data); + + public DATATYPE get(int index); + + /** + * Returns the content of this as a List. + * The returned list is either a read-only snapshot or an editable reference to the content of this. + * If the returned list is editable and this is frozen, the only allowed operation is to add new items + * to the end of the list. + */ + public List<DATATYPE> asList(); + + /** + * Returns the buffer of incoming/future data to this list. + * This can be used to provide data to this list from other threads, after its creation, + * and to consume, wait for or be notified upon the arrival of such data. + * <p> + * Some list instances do not support late incoming data, + * such lists responds to <i>read</i> calls to IncomingData as expected and without + * incurring any synchronization, and throws an exception on <i>write</i> calls. + */ + public IncomingData<DATATYPE> incoming(); + + /** + * Returns a future in which all incoming data in this has become available. + * This has two uses: + * <ul> + * <li>Calling {@link #get} on this future will block (if necessary) until all incoming data has arrived, + * transfer that data from the incoming buffer into this list and invoke any listeners on this event + * on the calling thread. + * <li>Adding a listener on this future will cause it to be called when completion().get() is called, <i>after</i> + * the incoming data has been moved to this thread and <i>before</i> the get() call returns. + * </ul> + * <p> + * Note that if no thread calls completed().get(), this future will never occur. + * <p> + * Any data list consumer who wishes to make sure it sees the complete data for this list + * <i>must</i> call <code>dataList.complete().get()</code> before consuming this list. + * If a guaranteed non-blocking call to this method is desired, register a listener on the future where all + * data is available for draining (that is, on <code>dataList.incoming().completed()</code>) + * and resume by calling this method from the listener. + * <p> + * Making this call on a list which does not support future data always returns immediately and + * causes no memory synchronization cost. + */ + public ListenableFuture<DataList<DATATYPE>> complete(); + + /** + * Adds a listener which is invoked every time data is added to this list. + * The listener is always invoked on the same thread which is adding the data, + * and hence it can modify this list freely without synchronization. + */ + public void addDataListener(Runnable runnable); + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java new file mode 100644 index 00000000000..212644a186f --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java @@ -0,0 +1,150 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.collections.Tuple2; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; + +/** + * The default incoming data implementation + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> { + + private DataList<DATATYPE> owner = null; + + private final SettableFuture<DataList<DATATYPE>> completionFuture; + + private final List<DATATYPE> dataList = new ArrayList<>(); + + private List<Tuple2<Runnable,Executor>> newDataListeners = null; + + /** + * If this is completed no more data can be added + */ + private boolean complete = false; + + /** + * Creates an instance which must be assigned an owner after creation + */ + public DefaultIncomingData() { + this(null); + } + + public DefaultIncomingData(DataList<DATATYPE> owner) { + assignOwner(owner); + completionFuture = SettableFuture.create(); + } + + /** + * Assigns the owner of this. Throws an exception if the owner is already set. + */ + public final void assignOwner(DataList<DATATYPE> owner) { + if (this.owner != null) throw new NullPointerException("Owner of " + this + " was already assigned"); + this.owner = owner; + } + + @Override + public DataList<DATATYPE> getOwner() { + return owner; + } + + @Override + public ListenableFuture<DataList<DATATYPE>> completed() { + return completionFuture; + } + + /** + * Returns whether the data in this is complete + */ + @Override + public synchronized boolean isComplete() { + return complete; + } + + /** + * Add new data and mark this as completed + */ + @Override + public synchronized void addLast(DATATYPE data) { + addLast(Collections.singletonList(data)); + } + + /** + * Add new data without completing this + */ + @Override + public synchronized void add(DATATYPE data) { + add(Collections.singletonList(data)); + } + + /** + * Add new data and mark this as completed + */ + @Override + public synchronized void addLast(List<DATATYPE> data) { + add(data); + markComplete(); + } + + /** + * Add new data without completing this + */ + @Override + public synchronized void add(List<DATATYPE> data) { + if (complete) throw new IllegalStateException("Attempted to add data to completed " + this); + + dataList.addAll(data); + notifyDataListeners(); + } + + /** + * Mark this as completed and notify any listeners + */ + @Override + public synchronized void markComplete() { + complete = true; + completionFuture.set(owner); + } + + /** + * Get and remove all the data currently available in this. + * The returned list is a modifiable fresh instance owned by the caller. + */ + public synchronized List<DATATYPE> drain() { + List<DATATYPE> dataListCopy = new ArrayList<>(dataList); + dataList.clear(); + return dataListCopy; + } + + @Override + public void addNewDataListener(Runnable listener, Executor executor) { + synchronized (this) { + if (newDataListeners == null) + newDataListeners = new ArrayList<>(); + newDataListeners.add(new Tuple2<>(listener, executor)); + if (dataList.isEmpty()) return; + } + notifyDataListeners(); + } + + private void notifyDataListeners() { + if (newDataListeners == null) return; + for (Tuple2<Runnable, Executor> listener : newDataListeners) { + listener.second.execute(listener.first); + } + } + + @Override + public String toString() { + return "incoming: " + (complete ? "complete" : "incomplete") + ", data " + dataList; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java b/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java new file mode 100644 index 00000000000..7653b8e6c28 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/FutureResponse.java @@ -0,0 +1,82 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ForwardingFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.request.ErrorMessage; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A processing response which will arrive in the future. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class FutureResponse extends ForwardingFuture<Response> { + + private final Request request; + + /** + * Only used for generating messages + */ + private final Execution execution; + + private final static Logger log = Logger.getLogger(FutureResponse.class.getName()); + + private final ListenableFutureTask<Response> futureTask; + + public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) { + this.futureTask = ListenableFutureTask.create(callable); + this.request = request; + this.execution = execution; + } + + @Override + public ListenableFutureTask<Response> delegate() { + return futureTask; + } + + public + @Override + Response get() { + try { + return super.get(); + } catch (InterruptedException e) { + return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e); + return new Response(request, new ErrorMessage("Error in '" + execution + "'", e)); + } + } + + public + @Override + Response get(long timeout, TimeUnit timeunit) { + try { + return super.get(timeout, timeunit); + } catch (InterruptedException e) { + return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e); + return new Response(request, new ErrorMessage("Error in '" + execution + "'", e)); + } catch (TimeoutException e) { + return new Response(request, new ErrorMessage("Error executing '" + execution + "': " + " Chain timed out.")); + } + } + + /** + * Returns the query used in this execution, never null + */ + public Request getRequest() { + return request; + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/IncomingData.java b/processing/src/main/java/com/yahoo/processing/response/IncomingData.java new file mode 100644 index 00000000000..5939f56a8f3 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/IncomingData.java @@ -0,0 +1,219 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * A data list own once instance of this which can be used to provide data asynchronously to the list, + * and consume, wait for or be notified upon the arrival of such data. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public interface IncomingData<DATATYPE extends Data> { + + /** + * Returns the owner (target DataList) of this. + * Note that accessing the owner from the thread producing incoming data + * is generally *not* thread safe. + */ + public DataList<DATATYPE> getOwner(); + + /** + * Returns a future in which all the incoming data that will be produced in this is available. + * Listeners on this are invoked on the thread producing the incoming data (or a thread spawned from it), + * which in general is separate from the thread using the data list. Hence, listeners on this even cannot + * in general assume that they may modify the data list or the request. + * <p> + * The data is not {@link #drain drained} into the owner of this by this method. That must be done + * by the thread using the data list. + * <p> + * This return the list owning this for convenience. + */ + public abstract ListenableFuture<DataList<DATATYPE>> completed(); + + /** + * Returns whether this is complete + */ + public boolean isComplete(); + + /** + * Add new data and mark this as completed + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + public void addLast(DATATYPE data); + + /** + * Add new data without completing this + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + public void add(DATATYPE data); + + /** + * Add new data and mark this as completed + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + public void addLast(List<DATATYPE> data); + + /** + * Add new data without completing this. + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + public void add(List<DATATYPE> data); + + /** + * Mark this as completed and notify any listeners. If this is already complete this method does nothing. + */ + public void markComplete(); + + /** + * Get and remove all the data currently available in this + */ + public List<DATATYPE> drain(); + + /** + * Add a listener which will be invoked every time new data is added to this. + * This listener may be invoked at any time in any thread, any thread synchronization is left + * to the listener itself + */ + public void addNewDataListener(Runnable listener, Executor executor); + + /** + * Creates a null implementation of this which is empty and complete at creation: + * <ul> + * <li>Provides immediate return without incurring any memory synchronization for + * any read method. + * <li>Throws an exception on any write method + * </ul> + * <p> + * This allows consumers to check for completion the same way whether or not the data list in question + * supports asynchronous addition of data, and without incurring unnecessary costs. + */ + public static final class NullIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> { + + private DataList<DATATYPE> owner; + private final ImmediateFuture<DATATYPE> completionFuture; + + public NullIncomingData(DataList<DATATYPE> owner) { + this.owner = owner; + completionFuture = new ImmediateFuture<>(owner); + } + + public ListenableFuture<DataList<DATATYPE>> completed() { + return completionFuture; + } + + @Override + public DataList<DATATYPE> getOwner() { + return owner; + } + + /** + * Returns true + */ + @Override + public boolean isComplete() { + return true; + } + + /** + * @throws IllegalStateException as this is read only + */ + public void addLast(DATATYPE data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void add(DATATYPE data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void addLast(List<DATATYPE> data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void add(List<DATATYPE> data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * Do nothing as this is already complete + */ + public void markComplete() { + } + + public List<DATATYPE> drain() { + return Collections.emptyList(); + } + + /** + * Adds a new data listener to this - this is a no-op + * as new data can never be added to this implementation. + */ + public void addNewDataListener(Runnable listener, Executor executor) { } + + public String toString() { + return "(no incoming)"; + } + + /** + * A future which is always done and incurs no synchronization. + * This is semantically the same as Futures.immediateFuture but contrary to it, + * this never causes any memory synchronization when accessed. + */ + public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + + private DataList<DATATYPE> owner; + + public ImmediateFuture(DataList<DATATYPE> owner) { + this.owner = owner; // keep here to avoid memory synchronization for access + set(owner); // Signal completion (for future listeners) + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public DataList<DATATYPE> get() { + return owner; + } + + @Override + public DataList<DATATYPE> get(long l, TimeUnit timeUnit) { + return owner; + } + + } + + } + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/Ordered.java b/processing/src/main/java/com/yahoo/processing/response/Ordered.java new file mode 100644 index 00000000000..10aeaaeb952 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/Ordered.java @@ -0,0 +1,18 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +/** + * This is an <i>optional marker interface</i>. + * DataLists may implement this to return false to indicate that the order of the elements of + * the list is insignificant. The usage of this is to allow the content of a list to be rendered in the order + * in which it completes rather than in the order in which it is added to the list. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @since 5.1.19 + */ +public interface Ordered { + + /** Returns false if the data in this list can be returned in any order. Default: true, meaning the order matters */ + public boolean isOrdered(); + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/Streamed.java b/processing/src/main/java/com/yahoo/processing/response/Streamed.java new file mode 100644 index 00000000000..6eab5a3287f --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/Streamed.java @@ -0,0 +1,21 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +/** + * This is an <i>optional marker interface</i>. + * DataLists may implement this to return false to indicate that no data from the list should be returned to clients + * until it is completed. This is useful in cases where some decision making which may impact the content of the list + * must be deferred until the list is complete. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @since 5.1.19 + */ +public interface Streamed { + + /** + * Returns false if the data in this list can not be returned until it is completed. + * Default: true, meaning eager streaming of the data is permissible. + */ + public boolean isStreamed(); + +} diff --git a/processing/src/main/java/com/yahoo/processing/response/package-info.java b/processing/src/main/java/com/yahoo/processing/response/package-info.java new file mode 100644 index 00000000000..649a719c7d3 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/response/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.response; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java new file mode 100644 index 00000000000..fe1e0d75ee3 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java @@ -0,0 +1,554 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.AsyncExecution; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.execution.ExecutionWithResponse; +import com.yahoo.processing.execution.RunnableExecution; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.*; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * A collection of processors for test purposes. + * + * @author bratseth + */ +public class ProcessorLibrary { + + private ProcessorLibrary() { + } + + // ---------------------------------------- Data types + + public static class StringData extends AbstractData { + + private String string; + + public StringData(Request request, String string) { + super(request); + this.string = string; + } + + public void setString(String string) { + this.string = string; + } + + @Override + public String toString() { + return string; + } + + } + + public static class MapData extends AbstractData { + + private Map map = new LinkedHashMap(); + + public MapData(Request request) { + super(request); + } + + public Map map() { return map; } + + @Override + public String toString() { + return "map data: " + map; + } + + } + + // ---------------------------------------- DataLists + + public static class UnorderedArrayDataList extends ArrayDataList implements Ordered { + + public UnorderedArrayDataList(Request request) { + super(request); + } + + @Override + public boolean isOrdered() {return false; } + + } + + // ---------------------------------------- Processors + + /** + * Makes some modifications to the request, passes it on and finally removes one data item from the response + */ + public static class CombineData extends Processor { + + public Response process(Request request, Execution execution) { + request.properties().set("appendage", request.properties().getInteger("appendage") + 1); + Response response = execution.process(request); + + // Modify the response + StringData first = (StringData) response.data().get(0); + StringData third = (StringData) response.data().get(2); + first.setString(first.toString() + ", " + third.toString()); + response.data().asList().remove(2); + return response; + } + + } + + /** + * Sends the request multiple times to get at least 6 pieces of data + */ + public static class Get6DataItems extends Processor { + + @SuppressWarnings("unchecked") + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + while (response.data().asList().size() < 6) { + request.properties().set("appendage", request.properties().getInteger("appendage") + 1); + Response additional = execution.process(request); + response.mergeWith(additional); + response.data().asList().addAll(additional.data().asList()); + } + return response; + } + + } + + /** + * Produces 3 pieces of string data + */ + public static class DataSource extends Processor { + + @SuppressWarnings("unchecked") + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().add(new StringData(request, "first." + request.properties().get("appendage"))); + response.data().add(new StringData(request, "second." + request.properties().get("appendage"))); + response.data().add(new StringData(request, "third." + request.properties().get("appendage"))); + return response; + } + + } + + public static class Federator extends Processor { + + private final List<Chain<? extends Processor>> chains; + + private final boolean ordered; + + /** + * Federates over the given chains. Returns an ordered response. + */ + @SafeVarargs + public Federator(Chain<? extends Processor>... chains) { + this(true, chains); + } + + /** + * Federates over the given chains + * + * @param ordered true if the returned list should be ordered (default), false if it should be permissible + * to render the datalist from each federated source in the order it completes. + */ + @SafeVarargs + public Federator(boolean ordered, Chain<? extends Processor>... chains) { + this.chains = Arrays.asList(chains); + this.ordered = ordered; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request)); + + List<FutureResponse> futureResponses = new ArrayList<>(chains.size()); + for (Chain<? extends Processor> chain : chains) { + + futureResponses.add(new AsyncExecution(chain, execution).process(request.clone())); + } + AsyncExecution.waitForAll(futureResponses, 1000); + for (FutureResponse futureResponse : futureResponses) { + Response federatedResponse = futureResponse.get(); + response.data().add(federatedResponse.data()); + response.mergeWith(federatedResponse); + } + return response; + } + } + + /** + * A federator which supports returning frozen data from each chain before the response is returned. + */ + public static class EagerReturnFederator extends Processor { + + private final List<Chain<? extends Processor>> chains; + + private final boolean ordered; + + /** + * Federates over the given chains. Returns an ordered response. + */ + @SafeVarargs + public EagerReturnFederator(Chain<? extends Processor>... chains) { + this(true, chains); + } + + /** + * Federates over the given chains + * + * @param ordered true if the returned list should be ordered (default), false if it should be permissible + * to render the datalist from each federated source in the order it completes. + */ + @SafeVarargs + public EagerReturnFederator(boolean ordered, Chain<? extends Processor>... chains) { + this.chains = Arrays.asList(chains); + this.ordered = ordered; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + List<FutureResponse> futureResponses = new ArrayList<>(chains.size()); + for (Chain<? extends Processor> chain : chains) { + futureResponses.add(new AsyncExecution(chain, execution).process(request.clone())); + } + AsyncExecution.waitForAll(futureResponses, 1000); + Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request)); + for (FutureResponse futureResponse : futureResponses) { + Response federatedResponse = futureResponse.get(); + response.data().add(federatedResponse.data()); + response.mergeWith(federatedResponse); + } + return response; + } + } + + /** + * Adds a data element containing the (recursive) count of concrete (non-list) data elements in the response + */ + public static class DataCounter extends Processor { + + private String prefix = ""; + + public DataCounter() { + } + + /** + * The prefix "[name] " is prepended to the string data + */ + public DataCounter(String name) { + prefix = "[" + name + "] "; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + int dataCount = countData(response.data()); + response.data().add(new StringData(request, prefix + "Data count: " + dataCount)); + return response; + } + + private int countData(DataList<? extends Data> dataList) { + int count = 0; + for (Data data : dataList.asList()) { + if (data instanceof DataList) + count += countData((DataList<?>) data); + else + count++; + } + return count; + } + } + + // TODO: Replace by below? + public static class FutureDataSource extends Processor { + + /** The list of incoming data this has created */ + public final List<IncomingData> incomingData = new ArrayList<>(); + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList = ArrayDataList.createAsync(request); + incomingData.add(dataList.incoming()); + return new Response(dataList); + } + + } + + /** Allows waiting for that request to happen. */ + public static class ListenableFutureDataSource extends Processor { + + private final boolean ordered, streamed; + + /** The incoming data this has created */ + public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + + /** Create an instance which returns ordered, streamable data */ + public ListenableFutureDataSource() { this(true, true); } + + public ListenableFutureDataSource(boolean ordered, boolean streamed) { + this.ordered = ordered; + this.streamed = streamed; + } + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList; + if (! ordered) + dataList = ArrayDataList.createAsyncUnordered(request); + else if (! streamed) + dataList = ArrayDataList.createAsyncNonstreamed(request); + else + dataList = ArrayDataList.createAsync(request); + incomingData.set(dataList.incoming()); + return new Response(dataList); + } + + } + + /** Allows waiting for that request to happen. */ + public static class RequestCounter extends Processor { + + /** The incoming data this has created */ + public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList = ArrayDataList.createAsync(request); + incomingData.set(dataList.incoming()); + return new Response(dataList); + } + + } + + /** + * Multiples the amount of data returned by parallelism by performing parallel executions of the rest of the chain + */ + public static class BlockingSplitter extends Processor { + + private final int parallelism; + + public BlockingSplitter(int parallelism) { + this.parallelism = parallelism; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + try { + // start executions in other threads + List<FutureResponse> futures = new ArrayList<>(parallelism - 1); + for (int i = 1; i < parallelism; i++) { + futures.add(new AsyncExecution(execution).process(request.clone())); + } + + // complete this execution + Response response = execution.process(request); + + // wait for other executions and merge the responses + for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) { + additionalResponse.data().complete().get(); // block until we have all the data elements + for (Object item : additionalResponse.data().asList()) + response.data().add((Data) item); + response.mergeWith(additionalResponse); + } + return response; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * Registers an async processing of the chain given in the constructor on completion of the data in the response + */ + public static class AsyncDataProcessingInitiator extends Processor { + + private final Chain<Processor> asyncChain; + + public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) { + this.asyncChain = asyncChain; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + // TODO: Consider for to best provide helpers for this + response.data().complete().addListener(new RunnableExecution(request, + new ExecutionWithResponse(asyncChain, response, execution)), + MoreExecutors.sameThreadExecutor()); + return response; + } + + } + + /** + * Registers a chain to be invoked each time new data becomes available in the first child list + */ + public static class StreamProcessingInitiator extends Processor { + + private final Chain<Processor> streamChain; + + public StreamProcessingInitiator(Chain<Processor> streamChain) { + this.streamChain = streamChain; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + // TODO: Consider for to best provide helpers for this + response.data().addDataListener(new RunnableExecution(request, + new ExecutionWithResponse(streamChain, response, execution))); + return response; + } + + } + + /** + * A processor which on invocation prints the string given on construction + */ + public static class Echo extends Processor { + + private String s; + + public Echo(String s) { + this.s = s; + } + + @Override + public Response process(Request request, Execution execution) { + System.out.println(s); + return execution.process(request); + } + + } + + /** + * A processor which adds a StringData item containing the string given in the constructor to every response + */ + public static class StringDataAdder extends Processor { + + private String string; + + public StringDataAdder(String string) { + this.string = string; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().add(new StringData(request, string)); + return response; + } + + } + + /** + * A processor which adds an ErrorMessage to the request of the top level + * data of each returned response. + */ + public static class ErrorAdder extends Processor { + + private ErrorMessage errorMessage; + + public ErrorAdder(ErrorMessage errorMessage) { + this.errorMessage = errorMessage; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().request().errors().add(errorMessage); + return response; + } + + } + + /** + * A processor which adds a List of StringData items containing the strings given in the constructor to every response + */ + public static class StringDataListAdder extends Processor { + + private String[] strings; + + public StringDataListAdder(String... strings) { + this.strings = strings; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + DataList<StringData> list = ArrayDataList.create(request); + for (String string : strings) + list.add(new StringData(request, string)); + response.data().add(list); + return response; + } + + } + + /** + * Adds a the given trace message at the given trace level + */ + public static class Trace extends Processor { + + private String traceMessage; + private int traceLevel; + + public Trace(String traceMessage, int traceLevel) { + this.traceMessage = traceMessage; + this.traceLevel = traceLevel; + } + + @Override + public Response process(Request request, Execution execution) { + execution.trace().trace(traceMessage, traceLevel); + return execution.process(request); + } + + } + + public static final class StatusSetter extends Processor { + + private final int status; + + public StatusSetter(int status) { + this.status = status; + } + + @Override + public com.yahoo.processing.Response process(com.yahoo.processing.Request request, Execution execution) { + request.errors().add(new ErrorMessage(status, "")); + return execution.process(request); + } + + } + + /** + * Adds (key, value) to the log value trace. + */ + public static class LogValueAdder extends Processor { + private final String key; + private final String value; + + public LogValueAdder(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public Response process(Request request, Execution execution) { + execution.trace().logValue(key, value); + return execution.process(request); + } + } +} diff --git a/processing/src/main/java/com/yahoo/processing/test/Responses.java b/processing/src/main/java/com/yahoo/processing/test/Responses.java new file mode 100644 index 00000000000..bd795770b87 --- /dev/null +++ b/processing/src/main/java/com/yahoo/processing/test/Responses.java @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; + +/** + * Static utilities + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @since 5.1.13 + */ +public class Responses { + + /** + * Returns a data item as a recursively indented string + */ + public static String recursiveToString(Data data) { + StringBuilder b = new StringBuilder(); + asString(data, b, ""); + return b.toString(); + } + + private static void asString(Data data, StringBuilder b, String indent) { + b.append(indent).append(data).append("\n"); + if (!(data instanceof DataList)) return; + for (Data childData : ((DataList<? extends Data>) data).asList()) { + asString(childData, b, indent.concat(" ")); + } + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java b/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java new file mode 100644 index 00000000000..4578299adff --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/ResponseTestCase.java @@ -0,0 +1,139 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.yahoo.processing.response.ArrayDataList; +import com.yahoo.processing.response.DataList; +import com.yahoo.processing.test.ProcessorLibrary; +import com.yahoo.processing.test.Responses; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.*; + +/** + * @author bratseth + */ +@SuppressWarnings("unchecked") +public class ResponseTestCase { + + /** + * Create a nested async tree of data elements, complete it recursively and check completion order. + * Check the recursive toString printing along the way. + * List variable names ends by numbers specifying the index of the list at each level. + */ + @SuppressWarnings("unchecked") + @Test + public void testRecursiveCompletionAndToString() throws InterruptedException, ExecutionException { + // create lists + Request request = new Request(); + DataList list1 = ArrayDataList.create(request); + DataList list11 = ArrayDataList.create(request); + DataList list12 = ArrayDataList.createAsync(request); + DataList list13 = ArrayDataList.createAsync(request); + DataList list14 = ArrayDataList.create(request); + DataList list121 = ArrayDataList.createAsync(request); + DataList list122 = ArrayDataList.create(request); + DataList list123 = ArrayDataList.createAsync(request); + DataList list1231 = ArrayDataList.createAsync(request); + DataList list1232 = ArrayDataList.create(request); + // wire tree + list1.add(list11); + list1.add(list12); + list1.add(list13); + list1.add(list14); + list12.add(list121); + list12.add(list122); + list12.add(list123); + list123.add(list1231); + list123.add(list1232); + // add sync data elements + list1.add(new ProcessorLibrary.StringData(request,"list1")); + list12.add(new ProcessorLibrary.StringData(request,"list12")); + list14.add(new ProcessorLibrary.StringData(request,"list14")); + list122.add(new ProcessorLibrary.StringData(request,"list122")); + list1231.add(new ProcessorLibrary.StringData(request,"list1231")); + + assertEqualsIgnoreObjectNumbers("Uncompleted tree, no incoming",uncompletedTreeUncompletedIncoming,Responses.recursiveToString(list1)); + + // provide all async incoming data + list12.incoming().markComplete(); + list121.incoming().addLast(new ProcessorLibrary.StringData(request,"list121async1")); + list123.incoming().markComplete(); + list1231.incoming().add(new ProcessorLibrary.StringData(request,"list13231async1")); + list1231.incoming().addLast(new ProcessorLibrary.StringData(request,"list1231async2")); + list13.incoming().add(new ProcessorLibrary.StringData(request,"list13async1")); + list13.incoming().addLast(new ProcessorLibrary.StringData(request,"list13async2")); + + assertEqualsIgnoreObjectNumbers("Uncompleted tree, incoming complete",uncompletedTreeCompletedIncoming, Responses.recursiveToString(list1)); + + // complete all + Response.recursiveComplete(list1).get(); + assertEqualsIgnoreObjectNumbers("Completed tree",completedTree,Responses.recursiveToString(list1)); + } + + private void assertEqualsIgnoreObjectNumbers(String explanation,String expected,String actual) { + assertEquals(explanation,expected,removeObjectNumbers(actual)); + } + + /** Removes all object numbers (occurrences of @hexnumber) */ + private String removeObjectNumbers(String s) { + return s.replaceAll("@[0-9a-f]+",""); + } + + private static final String uncompletedTreeUncompletedIncoming= + "com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list122\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" + + " list1231\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list12\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: incomplete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list14\n" + + " list1\n"; + + private static final String uncompletedTreeCompletedIncoming= + "com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list121async1]]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list122\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data []]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list13231async1, list1231async2]]\n" + + " list1231\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list12\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, incoming: complete, data [list13async1, list13async2]]\n" + + " com.yahoo.processing.response.ArrayDataList [incomplete, (no incoming)]\n" + + " list14\n" + + " list1\n"; + + private static final String completedTree= + "com.yahoo.processing.response.ArrayDataList [completed]\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list121async1\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list122\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list1231\n" + + " list13231async1\n" + + " list1231async2\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list12\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list13async1\n" + + " list13async2\n" + + " com.yahoo.processing.response.ArrayDataList [completed]\n" + + " list14\n" + + " list1\n"; +} diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java new file mode 100644 index 00000000000..bdf04859151 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/execution/test/AsyncExecutionTestCase.java @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.test; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import org.junit.Test; + +import static com.yahoo.processing.test.ProcessorLibrary.*; +import static org.junit.Assert.assertEquals; + +/** + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class AsyncExecutionTestCase { + + /** Execute a processing chain which forks off into multiple threads */ + @Test + public void testAsyncExecution() { + // Create a chain + Chain<Processor> chain=new Chain<>(new CombineData(),new BlockingSplitter(2),new Get6DataItems(), new DataSource()); + + // Execute it + Request request=new Request(); + request.properties().set("appendage",1); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); + + // Verify the result + assertEquals(6*2-1,response.data().asList().size()); + assertEquals("first.2, third.2",response.data().get(0).toString()); + assertEquals("second.2",response.data().get(1).toString()); + assertEquals("first.3",response.data().get(2).toString()); + assertEquals("second.3",response.data().get(3).toString()); + assertEquals("third.3",response.data().get(4).toString()); + // from the parallel execution + assertEquals("first.2",response.data().get(5).toString()); + assertEquals("second.2",response.data().get(6).toString()); + assertEquals("third.2",response.data().get(7).toString()); + assertEquals("first.3",response.data().get(8).toString()); + assertEquals("second.3",response.data().get(9).toString()); + assertEquals("third.3",response.data().get(10).toString()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java new file mode 100644 index 00000000000..f5b3121f2f8 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/execution/test/ExecutionContextTestCase.java @@ -0,0 +1,96 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.test; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.test.ProcessorLibrary; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +/** + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ExecutionContextTestCase extends junit.framework.TestCase { + + private Chain<Processor> chain=new Chain<Processor>(new ProcessorLibrary.DataSource()); + + /** Tests combined use of trace messages, context values and access log entries */ + public void testtrace() { + Execution execution1=Execution.createRoot(chain,2,Execution.Environment.createEmpty()); + execution1.trace().setProperty("a","a1"); + execution1.trace().logValue("a","a1"); + execution1.trace().trace("root 1", 2); + execution1.trace().setProperty("a","a2"); + execution1.trace().setProperty("b","b1"); + execution1.trace().logValue("a","a2"); + execution1.trace().logValue("b","b1"); + + Execution execution2=new Execution(chain,execution1); + execution2.trace().setProperty("b","b2"); + execution2.trace().logValue("b","b2"); + execution2.trace().trace(" child-1 1", 2); + execution2.trace().setProperty("b", "b3"); + execution2.trace().logValue("b","b3"); + + execution1.trace().setProperty("b","b4"); + execution1.trace().logValue("b","b4"); + + Execution execution3=new Execution(chain,execution1); + execution3.trace().setProperty("b","b5"); + execution3.trace().setProperty("c","c1"); + execution3.trace().logValue("b","b5"); + execution3.trace().logValue("c","c1"); + execution3.trace().trace(" child-2 1", 2); + + execution2.trace().setProperty("c","c2"); + execution2.trace().logValue("c","c2"); + + execution1.trace().trace("root 2", 2); + execution3.trace().setProperty("d", "d1"); + execution1.trace().logValue("d","d1"); + + execution2.trace().trace(" child-1 2", 2); + execution2.trace().setProperty("c", "c3"); + execution2.trace().logValue("c","c3"); + + execution1.trace().setProperty("c","c4"); + execution1.trace().logValue("c","c4"); + + Iterator<String> traceIterator=execution1.trace().traceNode().root().descendants(String.class).iterator(); + assertEquals("root 1",traceIterator.next()); + assertEquals(" child-1 1",traceIterator.next()); + assertEquals(" child-1 2",traceIterator.next()); + assertEquals(" child-2 1",traceIterator.next()); + assertEquals("root 2",traceIterator.next()); + assertFalse(traceIterator.hasNext()); + + // Verify context variables + assertEquals("a2", execution1.trace().getProperty("a")); + assertEquals("b5", execution1.trace().getProperty("b")); + assertEquals("c4", execution1.trace().getProperty("c")); + assertEquals("d1", execution1.trace().getProperty("d")); + assertNull(execution1.trace().getProperty("e")); + + // Verify access log + Set<String> logValues=new HashSet<>(); + for (Iterator<Execution.Trace.LogValue> logValueIterator=execution1.trace().logValueIterator(); logValueIterator.hasNext(); ) + logValues.add(logValueIterator.next().toString()); + assertEquals(12,logValues.size()); + assertTrue(logValues.contains("a=a1")); + assertTrue(logValues.contains("a=a2")); + assertTrue(logValues.contains("b=b1")); + assertTrue(logValues.contains("b=b2")); + assertTrue(logValues.contains("b=b3")); + assertTrue(logValues.contains("b=b4")); + assertTrue(logValues.contains("b=b5")); + assertTrue(logValues.contains("c=c1")); + assertTrue(logValues.contains("c=c2")); + assertTrue(logValues.contains("d=d1")); + assertTrue(logValues.contains("c=c3")); + assertTrue(logValues.contains("c=c4")); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java new file mode 100644 index 00000000000..09d7c38322f --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/execution/test/FutureDataTestCase.java @@ -0,0 +1,173 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.test; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.response.DataList; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static com.yahoo.processing.test.ProcessorLibrary.*; +import static org.junit.Assert.assertEquals; + +/** + * Tests scenarios where a data producer returns a promise of some future data rather than the data itself. + * As no processor waits for the data it is returned all the way to the caller. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class FutureDataTestCase { + + /** Run a chain which ends in a processor which returns a response containing future data. */ + @SuppressWarnings("unchecked") + @Test + public void testFutureDataPassThrough() throws InterruptedException, ExecutionException, TimeoutException { + // Set up + FutureDataSource futureDataSource=new FutureDataSource(); + Chain<Processor> chain=new Chain<>(new DataCounter(),futureDataSource); + + // Execute + Request request=new Request(); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); // Urk ... + + // Verify the result prior to completion of delayed data + assertEquals(1,response.data().asList().size()); + assertEquals("Data count: 0",response.data().get(0).toString()); + + // complete delayed data + assertEquals("Delayed data was requested once", 1, futureDataSource.incomingData.size()); + futureDataSource.incomingData.get(0).add(new StringData(request, "d1")); + futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2")); + assertEquals("New data is not visible because we haven't asked for it", 1, response.data().asList().size()); + response.data().complete().get(1000, TimeUnit.MILLISECONDS); + assertEquals("Now the data is available", 3, response.data().asList().size()); + assertEquals("d1",response.data().get(1).toString().toString()); + assertEquals("d2",response.data().get(2).toString().toString()); + } + + /** Federate to one source which returns data immediately and one who return future data */ + @SuppressWarnings("unchecked") + @Test + public void testFederateSyncAndAsyncData() throws InterruptedException, ExecutionException, TimeoutException { + // Set up + FutureDataSource futureDataSource=new FutureDataSource(); + Chain<Processor> chain=new Chain<>(new DataCounter(),new Federator(new Chain<>(new DataSource()),new Chain<>(futureDataSource))); + + // Execute + Request request=new Request(); + request.properties().set("appendage",1); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); + + // Verify the result prior to completion of delayed data + assertEquals(3,response.data().asList().size()); // The sync data list + the (currently empty) future data list) + the data count + DataList syncData=(DataList)response.data().get(0); + DataList asyncData=(DataList)response.data().get(1); + StringData countData=(StringData)response.data().get(2); + + assertEquals("The sync data is available",3,syncData.asList().size()); + assertEquals( "first.1",syncData.get(0).toString()); + assertEquals("second.1", syncData.get(1).toString()); + assertEquals( "third.1",syncData.get(2).toString()); + assertEquals("No async data yet",0,asyncData.asList().size()); + assertEquals("The data counter has run and accessed the sync data","Data count: 3",countData.toString()); + + // complete async data + futureDataSource.incomingData.get(0).add(new StringData(request, "d1")); + futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2")); + assertEquals("New data is not visible because we haven't asked for it", 0, asyncData.asList().size()); + asyncData.complete().get(1000, TimeUnit.MILLISECONDS); + assertEquals("Now the data is available", 2, asyncData.asList().size()); + assertEquals("d1",asyncData.get(0).toString().toString()); + assertEquals("d2", asyncData.get(1).toString().toString()); + } + + /** Register a chain which will be called when some async data is available */ + @SuppressWarnings("unchecked") + @Test + public void testAsyncDataProcessing() throws InterruptedException, ExecutionException, TimeoutException { + // Set up + FutureDataSource futureDataSource=new FutureDataSource(); + Chain<Processor> asyncChain=new Chain<Processor>(new DataCounter()); + Chain<Processor> chain=new Chain<>(new AsyncDataProcessingInitiator(asyncChain),futureDataSource); + + // Execute + Request request=new Request(); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); + + // Verify the result prior to completion of delayed data + assertEquals("No data yet",0,response.data().asList().size()); + + // complete async data + futureDataSource.incomingData.get(0).add(new StringData(request, "d1")); + assertEquals("New data is not visible because it is not complete", 0, response.data().asList().size()); + futureDataSource.incomingData.get(0).addLast(new StringData(request, "d2")); + assertEquals("Not visible because it has not been synced yet", 0, response.data().asList().size()); + response.data().complete().get(1000, TimeUnit.MILLISECONDS); + assertEquals("Now the data as well as the count is available", 3, response.data().asList().size()); + assertEquals("d1",response.data().get(0).toString().toString()); + assertEquals("d2",response.data().get(1).toString().toString()); + assertEquals("Data count: 2",response.data().get(2).toString()); + } + + /** + * Register a chain which federates over three sources, two of which are future. + * When the first of the futures are done one additional chain is to be run. + * When both are done another chain is to be run. + */ + @SuppressWarnings("unchecked") + @Test + public void testAsyncDataProcessingOfFederatedResult() throws InterruptedException, ExecutionException, TimeoutException { + // Set up + // Source 1 (async with completion chain) + FutureDataSource futureSource1=new FutureDataSource(); + Chain<Processor> asyncChainSource1=new Chain<Processor>(new DataCounter("source1")); + Chain<Processor> chainSource1=new Chain<>(new AsyncDataProcessingInitiator(asyncChainSource1),futureSource1); + // Source 2 (async source) + FutureDataSource futureSource2=new FutureDataSource(); + Chain<Processor> chainSource2=new Chain<Processor>(futureSource2); + // Source 3 (sync source) + Chain<Processor> chainSource3=new Chain<Processor>(new DataSource()); + // Main chain federating to the above - not waiting for source 1 and 2 but invoking asyncMain when both are complete + Chain<Processor> asyncMain=new Chain<Processor>(new DataCounter("main")); + Chain<Processor> main=new Chain<>(new AsyncDataProcessingInitiator(asyncMain),new Federator(chainSource1,chainSource2,chainSource3)); + + // Execute + Request request=new Request(); + Response response=Execution.createRoot(main,0,Execution.Environment.createEmpty()).process(request); + + // Verify the result prior to completion of delayed data + assertEquals("We have the sync data plus placeholders for the async lists",3,response.data().asList().size()); + DataList source1Data=((DataList)response.data().get(0)); + DataList source2Data=((DataList)response.data().get(1)); + DataList source3Data=((DataList)response.data().get(2)); + + assertEquals("No data yet",0,source1Data.asList().size()); + assertEquals("No data yet",0,source2Data.asList().size()); + assertEquals(3,source3Data.asList().size()); + + // complete async data in source1 + futureSource1.incomingData.get(0).addLast(new StringData(request,"source1Data")); + assertEquals("Not visible yet", 0, source1Data.asList().size()); + source1Data.complete().get(1000, TimeUnit.MILLISECONDS); + assertEquals(2, source1Data.asList().size()); + assertEquals("source1Data",source1Data.get(0).toString()); + assertEquals("Completion listener chain on this has run", "[source1] Data count: 1", source1Data.get(1).toString()); + + // source2 & main completion + assertEquals("Main completion listener has not run", 3, response.data().asList().size()); + futureSource2.incomingData.get(0).addLast(new StringData(request, "source2Data")); + assertEquals("Main completion listener has not run", 3, response.data().asList().size()); + + Response.recursiveComplete(response.data()).get(); + assertEquals("Main completion listener has run", 4, response.data().asList().size()); + assertEquals("The main data counter saw all sync data, but not source2 data as it executes after this", + "[main] Data count: " + (2 + 0 + 3), response.data().get(3).toString()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java b/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java new file mode 100644 index 00000000000..1ce4e293104 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/execution/test/StreamingTestCase.java @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.test; + +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.IncomingData; +import com.yahoo.processing.test.ProcessorLibrary; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests listening on every available new piece of data in a response + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class StreamingTestCase { + + /** Tests adding a chain which is called every time new data is added to a data list */ + @SuppressWarnings("unchecked") + @Test + public void testStreamingData() throws InterruptedException, ExecutionException, TimeoutException { + // Set up + StreamProcessor streamProcessor = new StreamProcessor(); + Chain<Processor> streamProcessing = new Chain<Processor>(streamProcessor); + ProcessorLibrary.FutureDataSource futureDataSource=new ProcessorLibrary.FutureDataSource(); + Chain<Processor> main=new Chain<>(new ProcessorLibrary.DataCounter(), + new ProcessorLibrary.StreamProcessingInitiator(streamProcessing), + futureDataSource); + + // Execute + Request request=new Request(); + Response response= Execution.createRoot(main, 0, Execution.Environment.createEmpty()).process(request); + IncomingData incomingData = futureDataSource.incomingData.get(0); + + // State prior to receiving any additional data + assertEquals(1,response.data().asList().size()); + assertEquals("Data count: 0",response.data().get(0).toString()); + assertEquals("Add data listener invoked also for DataCounter", 1, streamProcessor.invocationCount); + assertEquals("Initial data count", 1, response.data().asList().size()); + + // add first data - we have no listener so the data is held in the incoming buffer + incomingData.add(new ProcessorLibrary.StringData(request, "d1")); + assertEquals("Data add listener not invoked as we are not listening on new data yet",1, streamProcessor.invocationCount); + assertEquals("New data is not consumed", 1, response.data().asList().size()); + + // start listening on incoming data - this is what a renderer will do + incomingData.addNewDataListener(new MockNewDataListener(incomingData), MoreExecutors.sameThreadExecutor()); + assertEquals("We got a data add event for the data which was already added", 2, streamProcessor.invocationCount); + assertEquals("New data is consumed", 2, response.data().asList().size()); + + incomingData.add(new ProcessorLibrary.StringData(request, "d2")); + assertEquals("We are now getting data add events each time", 3, streamProcessor.invocationCount); + assertEquals("New data is consumed", 3, response.data().asList().size()); + + incomingData.addLast(new ProcessorLibrary.StringData(request, "d3")); + assertEquals("We are getting data add events also the last time", 4, streamProcessor.invocationCount); + assertEquals("New data is consumed", 4, response.data().asList().size()); + + response.data().complete().get(1000, TimeUnit.MILLISECONDS); // no-op here + assertEquals("d1",response.data().get(1).toString().toString()); + assertEquals("d2",response.data().get(2).toString().toString()); + assertEquals("d3",response.data().get(3).toString().toString()); + } + + private static class MockNewDataListener implements Runnable { + + private final IncomingData<Data> incomingData; + + public MockNewDataListener(IncomingData<Data> incomingData) { + this.incomingData = incomingData; + } + + @Override + public void run() { + // consume new data + for (Data newData : incomingData.drain()) { + incomingData.getOwner().add(newData); + } + // actual rendering would go here (at this point data add listeners will have executed) + } + + } + + private static class StreamProcessor extends Processor { + + int invocationCount; + + @Override + public Response process(Request request, Execution execution) { + invocationCount++; + return execution.process(request); + } + + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java b/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java new file mode 100644 index 00000000000..0d180bb6dbb --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/CompoundNameTestCase.java @@ -0,0 +1,158 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import static org.junit.Assert.*; + +import java.util.Iterator; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Splitter; +import com.yahoo.text.Lowercase; + +/** + * Module local test of the basic property name building block. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class CompoundNameTestCase { + + private static final String NAME = "com.yahoo.processing.request.CompoundNameTestCase"; + private CompoundName cn; + + @Before + public void setUp() throws Exception { + cn = new CompoundName(NAME); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testLast() { + assertEquals(NAME.substring(NAME.lastIndexOf('.') + 1), cn.last()); + } + + @Test + public final void testFirst() { + assertEquals(NAME.substring(0, NAME.indexOf('.')), cn.first()); + } + + @Test + public final void testRest() { + assertEquals(NAME.substring(NAME.indexOf('.') + 1), cn.rest().toString()); + } + + @Test + public final void testRestN() { + assertEquals("a.b.c.d.e", new CompoundName("a.b.c.d.e").rest(0).toString()); + assertEquals("b.c.d.e", new CompoundName("a.b.c.d.e").rest(1).toString()); + assertEquals("c.d.e", new CompoundName("a.b.c.d.e").rest(2).toString()); + assertEquals("d.e", new CompoundName("a.b.c.d.e").rest(3).toString()); + assertEquals("e", new CompoundName("a.b.c.d.e").rest(4).toString()); + assertEquals("", new CompoundName("a.b.c.d.e").rest(5).toString()); + } + + @Test + public final void testPrefix() { + assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName(""))); + assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a"))); + assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b"))); + assertTrue(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.c"))); + + assertFalse(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.c.d"))); + assertFalse(new CompoundName("a.b.c").hasPrefix(new CompoundName("a.b.d"))); + } + + @Test + public final void testSize() { + Splitter s = Splitter.on('.'); + Iterable<String> i = s.split(NAME); + int n = 0; + for (@SuppressWarnings("unused") String x : i) { + ++n; + } + assertEquals(n, cn.size()); + } + + @Test + public final void testGet() { + String s = cn.get(0); + assertEquals(NAME.substring(0, NAME.indexOf('.')), s); + } + + @Test + public final void testIsCompound() { + assertTrue(cn.isCompound()); + } + + @Test + public final void testIsEmpty() { + assertFalse(cn.isEmpty()); + } + + @Test + public final void testAsList() { + List<String> l = cn.asList(); + Splitter peoplesFront = Splitter.on('.'); + Iterable<String> answer = peoplesFront.split(NAME); + Iterator<String> expected = answer.iterator(); + for (int i = 0; i < l.size(); ++i) { + assertEquals(expected.next(), l.get(i)); + } + assertFalse(expected.hasNext()); + } + + @Test + public final void testEqualsObject() { + assertFalse(cn.equals(NAME)); + assertFalse(cn.equals(null)); + assertTrue(cn.equals(cn)); + assertTrue(cn.equals(new CompoundName(NAME))); + } + + @Test + public final void testEmptyNonEmpty() { + assertTrue(new CompoundName("").isEmpty()); + assertEquals(0, new CompoundName("").size()); + assertFalse(new CompoundName("a").isEmpty()); + assertEquals(1, new CompoundName("a").size()); + CompoundName empty = new CompoundName("a.b.c"); + assertTrue(empty == empty.rest(0)); + assertFalse(empty == empty.rest(1)); + } + + @Test + public final void testGetLowerCasedName() { + assertEquals(Lowercase.toLowerCase(NAME), cn.getLowerCasedName()); + } + + @Test + public void testAppend() { + assertEquals(new CompoundName("a.b.c.d"), new CompoundName("").append(new CompoundName("a.b.c.d"))); + assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a").append(new CompoundName("b.c.d"))); + assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b").append(new CompoundName("c.d"))); + assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b.c").append(new CompoundName("d"))); + assertEquals(new CompoundName("a.b.c.d"), new CompoundName("a.b.c.d").append(new CompoundName(""))); + } + + @Test + public void empty_CompoundName_is_prefix_of_any_CompoundName() { + CompoundName empty = new CompoundName(""); + + assertTrue(empty.hasPrefix(empty)); + assertTrue(new CompoundName("a").hasPrefix(empty)); + } + + @Test + public void whole_components_must_match_to_be_prefix() { + CompoundName stringPrefix = new CompoundName("a"); + CompoundName name = new CompoundName("aa"); + + assertFalse(name.hasPrefix(stringPrefix)); + } +} diff --git a/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java new file mode 100644 index 00000000000..6c512c82903 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameBenchmark.java @@ -0,0 +1,52 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.test; + +import com.yahoo.processing.request.CompoundName; + +/** + * @author balder + */ +public class CompoundNameBenchmark { + public void run() { + long result=0; + String strings[] = createStrings(1000); + // Warm-up + out("Warming up..."); + for (int i=0; i<10*1000; i++) + result+=createCompundName(strings); + + long startTime=System.currentTimeMillis(); + out("Running..."); + for (int i=0; i<100*1000; i++) + result+=createCompundName(strings); + out("Ignore this: " + result); // Make sure we are not fooled by optimization by creating an observable result + long endTime=System.currentTimeMillis(); + out("Compoundification 1000 strings 100.000 times took " + (endTime-startTime) + " ms"); + } + + private final String [] createStrings(int num) { + String strings [] = new String [num]; + for(int i=0; i < strings.length; i++) { + strings[i] = "this.is.a.short.compound.name." + i; + } + return strings; + } + + private final int createCompundName(String [] strings) { + int retval = 0; + for (int i=0; i < strings.length; i++) { + CompoundName n = new CompoundName(strings[i]); + retval += n.size(); + } + return retval; + } + + private void out(String string) { + System.out.println(string); + } + + public static void main(String[] args) { + new CompoundNameBenchmark().run(); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java new file mode 100644 index 00000000000..647000c5f88 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/test/CompoundNameTestCase.java @@ -0,0 +1,66 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.test; + +import com.yahoo.processing.request.CompoundName; +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +/** + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class CompoundNameTestCase { + + @Test + public void testFirstRest() { + assertEquals(CompoundName.empty, CompoundName.empty.rest()); + + CompoundName n=new CompoundName("on.two.three"); + assertEquals("on", n.first()); + assertEquals("two.three", n.rest().toString()); + n=n.rest(); + assertEquals("two", n.first()); + assertEquals("three", n.rest().toString()); + n=n.rest(); + assertEquals("three", n.first()); + assertEquals("", n.rest().toString()); + n=n.rest(); + assertEquals("", n.first()); + assertEquals("", n.rest().toString()); + n=n.rest(); + assertEquals("", n.first()); + assertEquals("", n.rest().toString()); + } + + @Test + public void testHashCodeAndEquals() { + CompoundName n1 = new CompoundName("venn.d.a"); + CompoundName n2 = new CompoundName(n1.asList()); + assertEquals(n1.hashCode(), n2.hashCode()); + assertEquals(n1, n2); + } + + @Test + public void testAppend() { + assertEquals("a",new CompoundName("a").append("").toString()); + assertEquals("a",new CompoundName("").append("a").toString()); + assertEquals("a.b",new CompoundName("a").append("b").toString()); + + CompoundName name = new CompoundName("a.b"); + assertEquals("a.b.c",name.append("c").toString()); + assertEquals("a.b.d",name.append("d").toString()); + } + + @Test + public void testEmpty() { + CompoundName empty=new CompoundName(""); + assertEquals("", empty.toString()); + assertEquals(0, empty.asList().size()); + } + + @Test + public void testAsList() { + assertEquals("[one]", new CompoundName("one").asList().toString()); + assertEquals("[one, two, three]", new CompoundName("one.two.three").asList().toString()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java new file mode 100644 index 00000000000..82fd25f9754 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/test/ErrorMessageTestCase.java @@ -0,0 +1,54 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.test; + +import com.yahoo.processing.Request; +import com.yahoo.processing.request.ErrorMessage; +import org.junit.Test; + +/** + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ErrorMessageTestCase extends junit.framework.TestCase { + + @Test + public void testToString() { + assertEquals("message",new ErrorMessage("message").toString()); + assertEquals("message: hello",new ErrorMessage("message",new Exception("hello")).toString()); + assertEquals("message: detail",new ErrorMessage("message","detail").toString()); + assertEquals("37: message: detail",new ErrorMessage(37,"message","detail").toString()); + assertEquals("message: detail: hello",new ErrorMessage("message","detail",new Exception("hello")).toString()); + assertEquals("message: detail: hello: world",new ErrorMessage("message","detail",new Exception("hello",new Exception("world"))).toString()); + assertEquals("message: detail: hello: Exception",new ErrorMessage("message","detail",new Exception("hello",new Exception())).toString()); + assertEquals("message: detail: hello",new ErrorMessage("message","detail",new Exception(new Exception("hello"))).toString()); + assertEquals("message: detail: java.lang.Exception: Exception",new ErrorMessage("message","detail",new Exception(new Exception())).toString()); + } + + @Test + public void testAccessors() { + ErrorMessage m = new ErrorMessage(37,"message","detail",new Exception("hello")); + assertEquals(37,m.getCode()); + assertEquals("message",m.getMessage()); + assertEquals("detail",m.getDetailedMessage()); + assertEquals("hello",m.getCause().getMessage()); + } + + @Test + public void testEquality() { + assertEquals(new ErrorMessage(37,"message","detail",new Exception("hello")), + new ErrorMessage(37,"message","detail",new Exception("hello"))); + assertEquals(new ErrorMessage("message","detail",new Exception("hello")), + new ErrorMessage("message","detail",new Exception("hello"))); + assertEquals(new ErrorMessage("message",new Exception("hello")), + new ErrorMessage("message",new Exception("hello"))); + assertEquals(new ErrorMessage("message"), + new ErrorMessage("message")); + assertEquals(new ErrorMessage("message",new Exception()), + new ErrorMessage("message")); + assertFalse(new ErrorMessage("message").equals(new ErrorMessage("message","detail"))); + assertFalse(new ErrorMessage(37,"message").equals(new ErrorMessage("message"))); + assertFalse(new ErrorMessage(37,"message").equals(new ErrorMessage(38,"message"))); + assertFalse(new ErrorMessage("message","detail1").equals(new ErrorMessage("message","detail2"))); + assertFalse(new ErrorMessage("message1").equals(new ErrorMessage("message2"))); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java new file mode 100644 index 00000000000..61ef75c55a2 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/test/PropertyMapTestCase.java @@ -0,0 +1,81 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.test; + +import com.yahoo.processing.request.properties.PropertyMap; +import com.yahoo.processing.request.properties.PublicCloneable; + +import java.util.Collections; +import java.util.List; + +/** + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class PropertyMapTestCase extends junit.framework.TestCase { + + public void testCloning() { + PropertyMap map=new PropertyMap(); + map.set("clonable",new ClonableObject()); + map.set("publicClonable",new PublicClonableObject()); + map.set("nonclonable",new NonClonableObject()); + map.set("clonableArray",new ClonableObject[] {new ClonableObject()}); + map.set("publicClonableArray",new ClonableObject[] {new ClonableObject()}); + map.set("nonclonableArray",new NonClonableObject[] {new NonClonableObject()}); + map.set("clonableList", Collections.singletonList(new ClonableObject())); + map.set("nonclonableList", Collections.singletonList(new NonClonableObject())); + assertNotNull(map.get("clonable")); + assertNotNull(map.get("nonclonable")); + + PropertyMap mapClone=map.clone(); + assertTrue(map.get("clonable") != mapClone.get("clonable")); + assertTrue(map.get("publicClonable")!= mapClone.get("publicClonable")); + assertTrue(map.get("nonclonable") == mapClone.get("nonclonable")); + + assertTrue(map.get("clonableArray") != mapClone.get("clonableArray")); + assertTrue(first(map.get("clonableArray")) != first(mapClone.get("clonableArray"))); + assertTrue(map.get("publicClonableArray") != mapClone.get("publicClonableArray")); + assertTrue(first(map.get("publicClonableArray")) != first(mapClone.get("publicClonableArray"))); + assertTrue(first(map.get("nonclonableArray")) == first(mapClone.get("nonclonableArray"))); + } + + private Object first(Object object) { + if (object instanceof Object[]) + return ((Object[])object)[0]; + if (object instanceof List) + return ((List<?>)object).get(0); + throw new IllegalArgumentException(); + } + + public static class ClonableObject implements Cloneable { + + @Override + public ClonableObject clone() { + try { + return (ClonableObject)super.clone(); + } + catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + + } + + public static class PublicClonableObject implements PublicCloneable<PublicClonableObject> { + + @Override + public PublicClonableObject clone() { + try { + return (PublicClonableObject)super.clone(); + } + catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + } + + } + + private static class NonClonableObject { + + } + + +} diff --git a/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java b/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java new file mode 100644 index 00000000000..7045ea1efbd --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/request/test/RequestTestCase.java @@ -0,0 +1,137 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.test; + +import com.yahoo.processing.Request; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.request.Properties; +import com.yahoo.processing.request.properties.PropertyMap; +import org.junit.Test; + +/** + * Tests using requests + * + * @author bratseth + */ +public class RequestTestCase extends junit.framework.TestCase { + + @Test + public void testProperties() { + Properties p = new PropertyMap(); + p.set("a", "a1"); + Request r = new Request(p); + r.properties().set("b", "b1"); + assertEquals(2, r.properties().listProperties().size()); + assertEquals("a1", r.properties().get("a")); + + assertEquals("b1", r.properties().get("b")); + assertEquals("b1", r.properties().get("b", "default")); + assertEquals("default", r.properties().get("c", "default")); + assertNull(r.properties().get("c")); + assertEquals("b1", r.properties().get(new CompoundName("b"))); + assertEquals("b1", r.properties().get(new CompoundName("b"), "default")); + assertEquals("default", r.properties().get(new CompoundName("c"), "default")); + assertNull(r.properties().get(new CompoundName("c"))); + + assertEquals("b1",r.properties().getString("b")); + assertEquals("b1",r.properties().getString("b","default")); + assertEquals("default",r.properties().getString("c","default")); + assertEquals(null,r.properties().getString("c")); + assertEquals("b1",r.properties().getString(new CompoundName("b"))); + assertEquals("b1",r.properties().getString(new CompoundName("b"),"default")); + assertEquals("default",r.properties().getString(new CompoundName("c"),"default")); + assertEquals(null,r.properties().getString(new CompoundName("c"))); + + r.properties().set("i",7); + assertEquals(7,(int)r.properties().getInteger("i")); + assertEquals(7,(int)r.properties().getInteger("i",3)); + assertEquals(3,(int)r.properties().getInteger("n",3)); + assertNull(r.properties().getInteger("n")); + assertEquals(7,(int)r.properties().getInteger(new CompoundName("i"))); + assertEquals(7,(int)r.properties().getInteger(new CompoundName("i"),3)); + assertEquals(3,(int)r.properties().getInteger(new CompoundName("n"),3)); + assertNull(r.properties().getInteger("n")); + + r.properties().set(new CompoundName("l"),7); + assertEquals(7, (long) r.properties().getLong("l")); + assertEquals(7,(long)r.properties().getLong("l",3l)); + assertEquals(3,(long)r.properties().getLong("m",3l)); + assertNull(r.properties().getInteger("m")); + assertEquals(7,(long)r.properties().getLong(new CompoundName("l"))); + assertEquals(7,(long)r.properties().getLong(new CompoundName("l"),3l)); + assertEquals(3,(long)r.properties().getLong(new CompoundName("m"),3l)); + assertNull(r.properties().getInteger("m")); + + r.properties().set("d",7.3); + assertEquals(7.3,r.properties().getDouble("d")); + assertEquals(7.3,r.properties().getDouble("d",3.4d)); + assertEquals(3.4,r.properties().getDouble("f",3.4d)); + assertNull(r.properties().getDouble("f")); + assertEquals(7.3,r.properties().getDouble(new CompoundName("d"))); + assertEquals(7.3,r.properties().getDouble(new CompoundName("d"),3.4d)); + assertEquals(3.4,r.properties().getDouble(new CompoundName("f"),3.4d)); + assertNull(r.properties().getDouble("f")); + + r.properties().set("o",true); + assertEquals(true,r.properties().getBoolean("o")); + assertEquals(true,r.properties().getBoolean("o",true)); + assertEquals(true,r.properties().getBoolean("g",true)); + assertEquals(false, r.properties().getBoolean("g")); + assertEquals(true,r.properties().getBoolean(new CompoundName("o"))); + assertEquals(true,r.properties().getBoolean(new CompoundName("o"),true)); + assertEquals(true,r.properties().getBoolean(new CompoundName("g"),true)); + assertEquals(false, r.properties().getBoolean("g")); + + r.properties().set(new CompoundName("x.y"), "x1.y1"); + r.properties().set("x.z", "x1.z1"); + + assertEquals(8, r.properties().listProperties().size()); + assertEquals(0, r.properties().listProperties("a").size()); + assertEquals(0, r.properties().listProperties(new CompoundName("a")).size()); + assertEquals(0, r.properties().listProperties(new CompoundName("none")).size()); + assertEquals(2, r.properties().listProperties(new CompoundName("x")).size()); + assertEquals(2, r.properties().listProperties("x").size()); + } + + @Test + public void testErrorMessages() { + Request r = new Request(); + r.errors().add(new ErrorMessage("foo")); + r.errors().add(new ErrorMessage("bar")); + assertEquals(2,r.errors().size()); + assertEquals("foo",r.errors().get(0).getMessage()); + assertEquals("bar",r.errors().get(1).getMessage()); + + } + + @Test + public void testCloning() { + Request request = new Request(); + request.properties().set("a","a1"); + request.properties().set("b","b1"); + request.errors().add(new ErrorMessage("foo")); + request.errors().add(new ErrorMessage("bar")); + Request rcloned = request.clone(); + rcloned.properties().set("c", "c1"); + rcloned.errors().add(new ErrorMessage("baz")); + request.properties().set("d", "d1"); + request.errors().add(new ErrorMessage("boz")); + + assertEquals("a1",request.properties().get("a")); + assertEquals("a1",rcloned.properties().get("a")); + assertEquals("b1",request.properties().get("b")); + assertEquals("b1",rcloned.properties().get("b")); + assertEquals(null,request.properties().get("c")); + assertEquals("c1",rcloned.properties().get("c")); + assertEquals("d1",request.properties().get("d")); + assertEquals(null,rcloned.properties().get("d")); + + assertEquals(3,request.errors().size()); + assertEquals(1,rcloned.errors().size()); + assertEquals("foo",request.errors().get(0).getMessage()); + assertEquals("bar",request.errors().get(1).getMessage()); + assertEquals("boz",request.errors().get(2).getMessage()); + assertEquals("baz",rcloned.errors().get(0).getMessage()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java b/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java new file mode 100644 index 00000000000..ea51e9079cc --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/DocumentationTestCase.java @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.test.documentation.AsyncDataProcessingInitiator; +import com.yahoo.processing.test.documentation.AsyncDataProducer; +import com.yahoo.processing.test.documentation.ExampleProcessor; +import com.yahoo.processing.test.documentation.Federator; + +/** + * See to it we can actually run the examples in the doc. + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class DocumentationTestCase { + + @SuppressWarnings("unchecked") + @Test + public final void test() { + Processor p = new ExampleProcessor(); + Chain<Processor> basic = new Chain<>(p); + Processor initiator = new AsyncDataProcessingInitiator(basic); + Chain<Processor> postProcessing = new Chain<>(initiator); + Execution e = Execution.createRoot(postProcessing, 0, Execution.Environment.createEmpty()); + Response r = e.process(new Request()); + // just adds a listener to the result returned from basic + assertEquals(0, r.data().asList().size()); + Processor producer = new AsyncDataProducer(); + Chain<Processor> asyncChain = new Chain<>(producer); + Processor federator = new Federator(basic, asyncChain); + e = Execution.createRoot(federator, 0, Execution.Environment.createEmpty()); + r = e.process(new Request()); + assertEquals(2, r.data().asList().size()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java b/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java new file mode 100644 index 00000000000..4f306773c2a --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/ProcessingTestCase.java @@ -0,0 +1,60 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import org.junit.Test; + +import static com.yahoo.processing.test.ProcessorLibrary.*; +import static org.junit.Assert.assertEquals; + +/** + * Tests the basic of the processing framework + */ +public class ProcessingTestCase { + + /** Execute three simple processors doing some phony processing */ + @Test + public void testChainedProcessing1() { + // Create a chain + Chain<Processor> chain=new Chain<>(new CombineData(),new Get6DataItems(), new DataSource()); + + // Execute it + Request request=new Request(); + request.properties().set("appendage",1); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); + + // Verify the result + assertEquals(6-1,response.data().asList().size()); + assertEquals("first.2, third.2",response.data().get(0).toString()); + assertEquals("second.2",response.data().get(1).toString()); + assertEquals("first.3",response.data().get(2).toString()); + assertEquals("second.3",response.data().get(3).toString()); + assertEquals("third.3",response.data().get(4).toString()); + } + + /** Execute the same processors in a different order */ + @Test + public void testChainedProcessing2() { + // Create a chain + Chain<Processor> chain=new Chain<>(new Get6DataItems(),new CombineData(), new DataSource()); + + // Execute it + Request request=new Request(); + request.properties().set("appendage",1); + Response response=Execution.createRoot(chain,0,Execution.Environment.createEmpty()).process(request); + + // Check the result + assertEquals(6,response.data().asList().size()); + assertEquals("first.2, third.2",response.data().get(0).toString()); + assertEquals("second.2",response.data().get(1).toString()); + assertEquals("first.4, third.4",response.data().get(2).toString()); + assertEquals("second.4",response.data().get(3).toString()); + assertEquals("first.6, third.6",response.data().get(4).toString()); + assertEquals("second.6",response.data().get(5).toString()); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java new file mode 100644 index 00000000000..afda8a7fe96 --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProcessingInitiator.java @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test.documentation; + +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.*; +import com.yahoo.processing.execution.*; + +/** + * A processor which registers a listener on the future completion of + * asynchronously arriving data to perform another chain at that point. + */ +public class AsyncDataProcessingInitiator extends Processor { + + private final Chain<Processor> asyncChain; + + public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) { + this.asyncChain=asyncChain; + } + + @Override + public Response process(Request request, Execution execution) { + Response response=execution.process(request); + response.data().complete().addListener(new RunnableExecution(request, + new ExecutionWithResponse(asyncChain, response, execution)), + MoreExecutors.sameThreadExecutor()); + return response; + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java new file mode 100644 index 00000000000..f2a51e240cc --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/documentation/AsyncDataProducer.java @@ -0,0 +1,37 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test.documentation; + +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.response.ArrayDataList; +import com.yahoo.processing.response.DataList; +import com.yahoo.processing.response.IncomingData; +import com.yahoo.processing.test.ProcessorLibrary.StringData; + +/** + * A data producer which producer data which will receive asynchronously. + * This is not a realistic, thread safe implementation as only the incoming data + * from the last created incoming data can be completed. + */ +public class AsyncDataProducer extends Processor { + + private IncomingData incomingData; + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + DataList dataList = ArrayDataList.createAsync(request); // Default implementation + incomingData=dataList.incoming(); + return new Response(dataList); + } + + /** Called by some other data producing thread, later */ + @SuppressWarnings("unchecked") + public void completeLateData() { + incomingData.addLast(new StringData(incomingData.getOwner().request(), + "A late hello, world!")); + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java b/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java new file mode 100644 index 00000000000..c87d508676d --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/documentation/ExampleProcessor.java @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test.documentation; + +import com.yahoo.processing.*; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.test.ProcessorLibrary.StringData; + +public class ExampleProcessor extends Processor { + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + // Process the Request: + request.properties().set("foo","bar"); + + // Pass it down the chain to get a response + Response response=execution.process(request); + + // process the response + response.data().add(new StringData(request,"Hello, world!")); + + return response; + } + +} diff --git a/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java b/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java new file mode 100644 index 00000000000..c69bdf0c85c --- /dev/null +++ b/processing/src/test/java/com/yahoo/processing/test/documentation/Federator.java @@ -0,0 +1,44 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test.documentation; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.AsyncExecution; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.response.FutureResponse; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Call a number of chains in parallel + */ +public class Federator extends Processor { + + private final List<Chain<? extends Processor>> chains; + + @SafeVarargs + public Federator(Chain<? extends Processor> ... chains) { + this.chains= Arrays.asList(chains); + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + List<FutureResponse> futureResponses=new ArrayList<>(chains.size()); + for (Chain<? extends Processor> chain : chains) { + futureResponses.add(new AsyncExecution(chain,execution).process(request)); + } + Response response=execution.process(request); + AsyncExecution.waitForAll(futureResponses,1000); + for (FutureResponse futureResponse : futureResponses) { + Response federatedResponse=futureResponse.get(); + response.data().add(federatedResponse.data()); + response.mergeWith(federatedResponse); + } + return response; + } +} |