diff options
Diffstat (limited to 'container-core/src/main/java/com/yahoo/processing/execution')
8 files changed, 774 insertions, 0 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java b/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java new file mode 100644 index 00000000000..2c40165f8e5 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java @@ -0,0 +1,156 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.FutureResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +/** + * <p>Provides asynchronous execution of processing chains. Usage:</p> + * + * <pre> + * Execution execution = new Execution(chain); + * AsyncExecution asyncExecution = new AsyncExecution(execution); + * Future<Response> future = asyncExecution.process(request) + * try { + * result = future.get(timeout, TimeUnit.milliseconds); + * } catch(TimeoutException e) { + * // Handle timeout + * } + * </pre> + * + * <p> + * The request is not thread safe. A clone() must be made for each parallel processing. + * </p> + * + * @author bratseth + * @see Execution + */ +public class AsyncExecution { + + private static final ThreadFactory threadFactory = ThreadFactoryFactory.getThreadFactory("processing"); + + private static final Executor executorMain = createExecutor(); + private static final Executor createExecutor() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(100, + Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(false), threadFactory); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + return executor; + } + + /** + * The execution of this + */ + private final Execution execution; + + /** + * Create an async execution of a single processor + */ + public AsyncExecution(Processor processor, Execution parent) { + this(new Execution(processor, parent)); + } + + /** + * Create an async execution of a chain + */ + public AsyncExecution(Chain<? extends Processor> chain, Execution parent) { + this(new Execution(chain, parent)); + } + + /** + * Creates an async execution from an existing execution. This async + * execution will execute the chain from the given execution, starting + * from the next processor in that chain. This is handy to execute + * multiple executions to the rest of the chain in parallel. + * <p> + * The state of the given execution is read on construction of this and not + * used later - the argument execution can be reused for other purposes. + * + * @param execution the execution from which the state of this is created + */ + public AsyncExecution(Execution execution) { + this.execution = new Execution(execution); + } + + /** + * Performs an async processing. Note that the given request cannot be simultaneously + * used in multiple such processings - a clone must be created for each. + */ + public FutureResponse process(Request request) { + return getFutureResponse(new Callable<Response>() { + @Override + public Response call() { + return execution.process(request); + } + }, request); + } + + private static <T> Future<T> getFuture(final Callable<T> callable) { + FutureTask<T> future = new FutureTask<>(callable); + executorMain.execute(future); + return future; + } + + private FutureResponse getFutureResponse(Callable<Response> callable, Request request) { + FutureResponse future = new FutureResponse(callable, execution, request); + executorMain.execute(future.delegate()); + return future; + } + + /* + * Waits for all futures until the given timeout. If a FutureResult isn't + * done when the timeout expires, it will be cancelled, and it will return a + * response. All unfinished Futures will be cancelled. + * + * @return the list of responses in the same order as returned from the task collection + */ + // Note that this may also be achieved using guava Futures. Not sure if this should be deprecated because of it. + public static List<Response> waitForAll(Collection<FutureResponse> tasks, long timeout) { + + // Copy the list in case it is modified while we are waiting + List<FutureResponse> workingTasks = new ArrayList<>(tasks); + @SuppressWarnings({"rawtypes", "unchecked"}) + Future task = getFuture(new Callable() { + @Override + public List<Future> call() { + for (FutureResponse task : workingTasks) { + task.get(); + } + return null; + } + }); + + try { + task.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + // Handle timeouts below + } + + List<Response> responses = new ArrayList<>(tasks.size()); + for (FutureResponse future : workingTasks) + responses.add(getTaskResponse(future)); + return responses; + } + + private static Response getTaskResponse(FutureResponse future) { + if (future.isDone() && !future.isCancelled()) { + return future.get(); // Since isDone() = true, this won't block. + } else { // Not done and no errors thrown + return new Response(future.getRequest(), new ErrorMessage("Timed out waiting for " + future)); + } + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/Execution.java b/container-core/src/main/java/com/yahoo/processing/execution/Execution.java new file mode 100644 index 00000000000..98bc3485084 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/Execution.java @@ -0,0 +1,487 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.collections.Pair; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.chain.ChainRegistry; +import com.yahoo.yolean.trace.TraceNode; +import com.yahoo.yolean.trace.TraceVisitor; + +import java.util.Iterator; + +/** + * An execution of a chain. This keeps tracks of the progress of the execution and is called by the + * processors (using {@link #process} to move the execution to the next one. + * + * @author bratseth + */ +public class Execution { + + /** + * The index of the searcher in the chain which should be executed on the next call + * An Execution instance contains the state of a chain execution by + * providing a class stack for a chain - when a processor is called + * (through this), it will increment the index of the processor to call next, + * each time a processor returns (regardless of how) it will do the opposite. + */ + private int processorIndex; + + private final Chain<? extends Processor> chain; + + private final Trace trace; + + private final Environment<? extends Processor> environment; + + /** + * Creates an execution of a single processor + * + * @param processor the processor to execute in this + * @param execution the parent execution of this + */ + public Execution(Processor processor, Execution execution) { + this(new Chain<>(processor), execution); + } + + /** Creates an execution of a single processor which is not in the context of an existing execution */ + public static Execution createRoot(Processor processor, int traceLevel, Environment<? extends Processor> environment) { + return createRoot(new Chain<>(processor), traceLevel, environment); + } + + /** + * Creates an execution which is not in the context of an existing execution + * + * @param chain the chain to execute + * @param traceLevel the level to emit trace at + * @param environment the execution environment to use + */ + public static Execution createRoot(Chain<? extends Processor> chain, int traceLevel, + Environment<? extends Processor> environment) { + return new Execution(chain, 0, Trace.createRoot(traceLevel), environment); + } + + /** + * Creates an execution of a chain + * + * @param chain the chain to execute in this, starting from the first processor + * @param execution the parent execution of this + */ + public Execution(Chain<? extends Processor> chain, Execution execution) { + this(chain, 0, execution.trace().createChild(), execution.environment().nested()); + } + + /** + * Creates an execution from another. This execution will start at the next processor of the + * given execution. The given execution can continue independently of this. + */ + public Execution(Execution startPoint) { + this(startPoint.chain, startPoint.processorIndex, startPoint.trace.createChild(), startPoint.environment().nested()); + } + + /** + * Creates a new execution by setting the internal state directly. + * + * @param chain the chain to execute + * @param startIndex the start index into that chain + * @param trace the context <b>of this</b>. If this is created from an existing execution/context, + * be sure to do <code> new Context<COMPONENT>(startPoint.context)</code> first! + * @param environment the static execution environment to use + */ + protected Execution(Chain<? extends Processor> chain, int startIndex, Trace trace, Environment<? extends Processor> environment) { + if (chain == null) throw new NullPointerException("Chain cannot be null"); + this.chain = chain; + this.processorIndex = startIndex; + this.trace = trace; + this.environment = environment; + } + + /** + * Calls process on the next processor in this chain. If there is no next, an empty response is returned. + */ + public Response process(Request request) { + Processor processor = next(); + if (processor == null) + return defaultResponse(request); + + Response response = null; + try { + nextProcessor(); + onInvoking(request, processor); + response = processor.process(request, this); + if (response == null) + throw new NullPointerException(processor + " returned null, not a Response object"); + return response; + } finally { + previousProcessor(); + onReturning(request, processor, response); + } + } + + /** + * Returns the index into the chain of processors which is currently next + */ + protected int nextIndex() { + return processorIndex; + } + + /** + * A hook called when a processor is to be invoked. Overriding methods must call super.onInvoking + */ + protected void onInvoking(Request request, Processor next) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Invoke " + next, traceAt); + } + if (Trace.Level.Dependencies.includes(trace.getTraceLevel())) + trace.trace(next.getId() + " " + next.getDependencies().toString(), trace.getTraceLevel()); + } + + /** + * A hook called when a processor returns, either normally or by throwing. + * Overriding methods must call super.onReturning + * + * @param request the processing request + * @param processor the processor which returned + * @param response the response returned, or null if the processor returned by throwing + */ + protected void onReturning(Request request, Processor processor, Response response) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Return " + processor, traceAt); + } + } + + /** Move this execution to the previous processor */ + protected void previousProcessor() { + processorIndex--; + } + + /** Move this execution to the next processor */ + protected void nextProcessor() { + processorIndex++; + } + + /** Returns the next searcher to be invoked in this chain, or null if we are at the last */ + protected Processor next() { + if (chain.components().size() <= processorIndex) return null; + return chain.components().get(processorIndex); + } + + /** Returns the chain this executes */ + public Chain<? extends Processor> chain() { + return chain; + } + + /** + * Creates the default response to return from this kind of execution when there are no further processors. + * If this is overridden, make sure to propagate any freezeListener from this to the returned response + * top-level DataList. + */ + protected Response defaultResponse(Request request) { + return new Response(request); + } + + public String toString() { + return "execution of chain '" + chain.getId() + "'"; + } + + public Trace trace() { + return trace; + } + + public Environment<? extends Processor> environment() { + return environment; + } + + /** + * Holds the static execution environment for the duration of an execution + */ + public static class Environment<COMPONENT extends Processor> { + + private final ChainRegistry<COMPONENT> chainRegistry; + + /** + * Creates an empty environment. Only useful for some limited testing + */ + public static <C extends Processor> Environment<C> createEmpty() { + return new Environment<>(new ChainRegistry<>()); + } + + /** + * Returns an environment for an execution spawned from the execution having this environment. + */ + public Environment<COMPONENT> nested() { + return this; // this is immutable, subclasses might want to do something else though + } + + /** + * Creates a new environment + */ + public Environment(ChainRegistry<COMPONENT> chainRegistry) { + this.chainRegistry = chainRegistry; + } + + /** + * Returns the processing chain registry of this execution environment. + * The registry may be empty, but never null. + */ + public ChainRegistry<COMPONENT> chainRegistry() { + return chainRegistry; + } + + } + + /** + * Tre trace of this execution. This is a facade into a node in the larger trace tree which captures + * the information about all executions caused by some request + * + * @author bratseth + */ + public static class Trace { + + /** + * The node in the trace tree capturing this execution + */ + private final TraceNode traceNode; + + /** + * The highest level of tracing this should record + */ + private int traceLevel; + + /** + * If true, do timing logic, even though trace level is low. + */ + private boolean forceTimestamps; + + /** + * Creates an empty root trace with a given level of tracing + */ + public static Trace createRoot(int traceLevel) { + return new Trace(traceLevel, new TraceNode(null, timestamp(traceLevel, false)), false); + } + + /** + * Creates a trace node below a parent + */ + public Trace createChild() { + TraceNode child = new TraceNode(null, timestamp(traceLevel, forceTimestamps)); + traceNode.add(child); + return new Trace(getTraceLevel(), child, forceTimestamps); + } + + /** + * Creates a new instance by assigning the internal state of this directly + */ + private Trace(int traceLevel, TraceNode traceNode, boolean forceTimestamps) { + this.traceLevel = traceLevel; + this.traceNode = traceNode; + this.forceTimestamps = forceTimestamps; + } + + /** + * Returns the maximum trace level this will record + */ + public int getTraceLevel() { + return traceLevel; + } + + /** + * Sets the maximum trace level this will record + */ + public void setTraceLevel(int traceLevel) { + this.traceLevel = traceLevel; + } + + public void setForceTimestamps(boolean forceTimestamps) { + this.forceTimestamps = forceTimestamps; + } + + public boolean getForceTimestamps() { + return forceTimestamps; + } + + /** + * Adds a trace message to this trace, if this trace has at most the given trace level + */ + public void trace(String message, int traceLevel) { + trace((Object)message, traceLevel); + } + public void trace(Object message, int traceLevel) { + if (this.traceLevel >= traceLevel) { + traceNode.add(new TraceNode(message, timestamp(traceLevel, forceTimestamps))); + } + } + + /** + * Adds a key-value which will be logged to the access log of this request. + * Multiple values may be set to the same key. A value cannot be removed once set, + * but it can be overwritten by adding another value for the same key. + */ + public void logValue(String key, String value) { + traceNode.add(new TraceNode(new LogValue(key, value), 0)); + } + + /** + * Returns the values that should be written to the access log set in the entire trace node tree + */ + public Iterator<LogValue> logValueIterator() { + return traceNode.root().descendants(LogValue.class).iterator(); + } + + /** + * Visits the entire trace tree + * + * @return the argument visitor for convenience + */ + public <VISITOR extends TraceVisitor> VISITOR accept(VISITOR visitor) { + return traceNode.root().accept(visitor); + } + + /** + * Adds a property key-value to this trace. + * Values are looked up by reverse depth-first search in the trace node tree. + * + * @param name the name of the property + * @param value the value of the property, or null to set this property to null + */ + public void setProperty(String name, Object value) { + traceNode.add(new TraceNode(new Pair<>(name, value), 0)); + } + + /** + * Returns a property set anywhere in the trace tree this points to. + * Note that even though this call is itself "thread robust", the object values returned + * may in some scenarios not be written behind a synchronization barrier, so when accessing + * objects which are not inherently thread safe, synchronization should be considered. + * <p> + * This method have a time complexity which is proportional to + * the number of trace nodes in the tree + * + * @return the value of this property, or null if none + */ + public Object getProperty(String name) { + return accept(new PropertyValueVisitor(name)).foundValue(); + } + + /** + * Returns the trace node peer of this + */ + public TraceNode traceNode() { + return traceNode; + } + + /** + * Returns a short string description of this + */ + @Override + public String toString() { + return "trace: " + traceNode; + } + + private static long timestamp(int traceLevel, boolean forceTimestamps) { + return (forceTimestamps || Level.Timestamp.includes(traceLevel)) ? System.currentTimeMillis() : 0; + } + + /** + * Visits all trace nodes to collect the last set value of a particular property in a trace tree + */ + private static class PropertyValueVisitor extends TraceVisitor { + + /** + * The name of the property to find + */ + private String name; + private Object foundValue = null; + + public PropertyValueVisitor(String name) { + this.name = name; + } + + @Override + public void visit(TraceNode node) { + if (node.payload() == null) return; + if (!(node.payload() instanceof Pair)) return; + + Pair property = (Pair) node.payload(); + if (!property.getFirst().equals(name)) return; + foundValue = property.getSecond(); + } + + public Object foundValue() { + return foundValue; + } + + } + + /** + * An immutable access log value added to the trace + */ + public static class LogValue { + + private final String key; + private final String value; + + public LogValue(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return key + "=" + value; + } + + } + + /** + * Defines what information is added at which trace level + */ + public enum Level { + + /** + * Every processing step initiated is traced + */ + Step(4), + /** + * All trace messages are timestamped + */ + Timestamp(6), + /** + * The before/after dependencies of each processing step is traced on every invocation + */ + Dependencies(7); + + /** + * The smallest trace level at which this information will be traced + */ + private int value; + + Level(int value) { + this.value = value; + } + + public int value() { + return value; + } + + /** + * Returns whether this level includes the given level, i.e whether traceLevel is this.value() or more + */ + public boolean includes(int traceLevel) { + return traceLevel >= this.value(); + } + + } + } +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java b/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java new file mode 100644 index 00000000000..a95771c1202 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An execution which has a response which is returned when this gets to the end of the chain. + * This is useful to run processing chains where a response exists up front, typically for on completion listeners. + * + * @author bratseth + */ +public class ExecutionWithResponse extends Execution { + + private Response response; + + /** + * Creates an execution which will return a given response at the end of the chain. + * + * @param chain the chain to execute in this + * @param response the response this will return from {@link #process} then the end of this chain is reached + * @param execution the the parent of this execution + */ + public ExecutionWithResponse(Chain<? extends Processor> chain, Response response, Execution execution) { + super(chain, execution); + this.response = response; + } + + @Override + protected Response defaultResponse(Request request) { + return response; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java b/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java new file mode 100644 index 00000000000..053459166e6 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java @@ -0,0 +1,17 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Response; + +/** + * An interface for classes which can be given responses. + * Freeze listeners may implement this to be handed the response + * before they are run. There is probably no other sensible use for this. + * + * @author bratseth + */ +public interface ResponseReceiver { + + public void setResponse(Response response); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java b/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java new file mode 100644 index 00000000000..e69cb8e48cd --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java @@ -0,0 +1,52 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An adaptor of an Execution to a runnable. Calling run on this causes process to be called on the + * given processor. + * + * @author bratseth + */ +public class RunnableExecution implements Runnable { + + private final Request request; + private final Execution execution; + private Response response = null; + private Throwable exception = null; + + public RunnableExecution(Request request, Execution execution) { + this.request = request; + this.execution = execution; + } + + /** + * Calls process on the execution of this. + * This will result in either response or exception being set on this. + * Calling this never throws an exception. + */ + public void run() { + try { + response = execution.process(request); + } catch (Exception e) { + exception = e; // TODO: Log + } + } + + /** + * Returns the response from executing this, or null if exception is set or run() has not been called yet + */ + public Response getResponse() { + return response; + } + + /** + * Returns the exception from executing this, or null if response is set or run() has not been called yet + */ + public Throwable getException() { + return exception; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java b/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java new file mode 100644 index 00000000000..dbf31de1b72 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java @@ -0,0 +1,14 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.chain; + +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.component.provider.ComponentRegistry; + +/** + * A registry of chains + * + * @author Tony Vaagenes + */ +public class ChainRegistry<T extends ChainedComponent> extends ComponentRegistry<Chain<T>> { +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java b/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java new file mode 100644 index 00000000000..6d82609bc76 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution.chain; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/execution/package-info.java b/container-core/src/main/java/com/yahoo/processing/execution/package-info.java new file mode 100644 index 00000000000..6430aa32ae3 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; |