diff options
author | gjoranv <gv@verizonmedia.com> | 2021-02-17 16:16:19 +0100 |
---|---|---|
committer | gjoranv <gv@verizonmedia.com> | 2021-02-17 17:13:44 +0100 |
commit | da183fe82e5d9eaccf3cbf03a0751cc74851ec31 (patch) | |
tree | 70cb9d97a7d2bdd5785e7512d7f88d5823e1407e /container-core/src/main/java/com/yahoo/processing | |
parent | a0ae5022c689578e456eba2b5f89ac077e0b07e1 (diff) |
Add java source files from the processing module.
Diffstat (limited to 'container-core/src/main/java/com/yahoo/processing')
34 files changed, 3977 insertions, 0 deletions
diff --git a/container-core/src/main/java/com/yahoo/processing/IllegalInputException.java b/container-core/src/main/java/com/yahoo/processing/IllegalInputException.java new file mode 100644 index 00000000000..3f1605860ed --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/IllegalInputException.java @@ -0,0 +1,25 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +/** + * Thrown on illegal input received from the requesting client. + * Use this instead of the superclass, IllegalArgumentException + * to signal illegal input to the client without causing logging and stack traces, + * + * @author bratseth + */ +public class IllegalInputException extends IllegalArgumentException { + + public IllegalInputException(String message) { + super(message); + } + + public IllegalInputException(Throwable cause) { + super(cause); + } + + public IllegalInputException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/Processor.java b/container-core/src/main/java/com/yahoo/processing/Processor.java new file mode 100644 index 00000000000..359244ff9c9 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/Processor.java @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.processing.execution.Execution; + +/** + * Superclass of chainable components processing Requests to create Responses. + * <p> + * Processors typically changes the Request and/or the Response. It may also make multiple + * forward requests, in series or parallel, or manufacture the response content itself or by calling + * an external service. + * <p> + * Typical usage: + * <code> + * public class MyProcessor extends Processor { + * + * @Override + * public Response process(Request request, Execution execution) { + * // process the request here + * Response response = execution.process(request); // Pass along to get the Response + * // process (or fill in) Data/DataList items on the response here + * return response; + * } + * + * } + * </code> + * + * @author bratseth + */ +public abstract class Processor extends ChainedComponent { + + /** + * Performs a processing request and returns the response + * + * @return a Response instance - never null - containing the data produced by this processor + * and those it forwards the request to + */ + public abstract Response process(Request request, Execution execution); + + +} diff --git a/container-core/src/main/java/com/yahoo/processing/Request.java b/container-core/src/main/java/com/yahoo/processing/Request.java new file mode 100644 index 00000000000..d6607f136fb --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/Request.java @@ -0,0 +1,90 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.yahoo.component.provider.FreezableClass; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.request.Properties; +import com.yahoo.processing.request.properties.PropertyMap; + +import java.util.ArrayList; +import java.util.List; + +/** + * A generic processing request. + * The request contains a set of properties that are used to communicate information from the client making the + * processing request (e.g http parameters), and as a blackboard to pass information between processors. + * + * @author bratseth + */ +public class Request extends FreezableClass implements Cloneable { + + private Properties properties; + + /** + * The errors encountered while processing this request + */ + private List<ErrorMessage> errors = new ArrayList<>(0); + + /** + * The name of the chain of Processor instances which will be invoked when + * executing a request. + */ + public static final CompoundName CHAIN = new CompoundName("chain"); + + /** + * The name of the request property used in the processing framework to + * store the incoming JDisc request. + */ + public static final CompoundName JDISC_REQUEST = new CompoundName("jdisc.request"); + + /** + * Creates a request with no properties + */ + public Request() { + this(new PropertyMap()); + } + + /** + * Create a request with the given properties. + * This Request gains ownership of the given properties and may edit them in the future. + * + * @param properties the properties owner by this + */ + public Request(Properties properties) { + this.properties = properties; + } + + /** + * Returns the properties set on this request. + * Processors may add properties to send messages to downstream processors. + */ + public Properties properties() { + return properties; + } + + /** + * Returns the list of errors encountered while processing this request, never null. + * This is a live reference to the modifiable list of errors of this. + */ + public List<ErrorMessage> errors() { + return errors; + } + + /** + * Returns a clone of this request. + * <p> + * The properties are logically deeply cloned such that changes to properties in the clone are independent. + * <p> + * The errors of the original request <b>are not</b> cloned into the new instance: + * It will have an empty list of errors. + */ + @Override + public Request clone() { + Request clone = (Request) super.clone(); + clone.properties = properties.clone(); + clone.errors = new ArrayList<>(0); + return clone; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/Response.java b/container-core/src/main/java/com/yahoo/processing/Response.java new file mode 100644 index 00000000000..485513cd0cb --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/Response.java @@ -0,0 +1,160 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.concurrent.SystemTimer; +import com.yahoo.processing.execution.ResponseReceiver; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.ArrayDataList; +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A Response to a Request. + * <p> + * A Response contains a list of Data items, which may (through Data implementations) contain payload data and/or + * further nested data lists. + * <p> + * Frameworks built on top of processing may subclass this to create a stricter definition of a response. + * Processors producing Responses should not create subclasses but should instead + * create additional instances/subclasses of Data. Such Processors should always create Response instances by calling + * execution.process(request), which will return an empty Response if there are no further processors in the chain. + * <p> + * Do not cache this as it may hold references to objects that should be garbage collected. + * + * @author bratseth + */ +public class Response extends ListenableFreezableClass { + + private final static CompoundName freezeListenerKey =new CompoundName("processing.freezeListener"); + + private final DataList<?> data; + + /** Creates a request containing an empty array data list */ + public Response(Request request) { + this(ArrayDataList.create(request)); + } + + /** Creates a response containing a list of data */ + public Response(DataList<?> data) { + this.data = data; + + Runnable freezeListener = null; + Request request = data.request(); + if (request != null) // subclasses of DataList may not ensure this + freezeListener = (Runnable)request.properties().get(freezeListenerKey); + if (freezeListener != null) { + if (freezeListener instanceof ResponseReceiver) + ((ResponseReceiver)freezeListener).setResponse(this); + data.addFreezeListener(freezeListener, MoreExecutors.directExecutor()); + } + } + + /** + * Convenience constructor which adds the given error message to the given request + */ + public Response(Request request, ErrorMessage errorMessage) { + this(ArrayDataList.create(request)); + request.errors().add(errorMessage); + } + + /** + * Processors which merges another request into this must call this method to notify the response. + * This does not modify the data of either response. + */ + public void mergeWith(Response other) { + } + + /** + * Returns the top level list of data items of this response + */ + public DataList data() { + return data; + } + + // ------ static utilities ---------------------------------------------------------------------------- + + /** + * Returns a future in which the given data list and all lists nested within it are completed. + * The only use of the returned future is to call a get() method on it to complete the given dataList and + * all dataLists nested below it recursively. + * <p> + * Lists are completed in prefix, depth-first order. DataLists added after the point when this method is called + * will not be completed. + * + * @param rootDataList the list to complete recursively + * @return the future in which all data in and below this list is complete, as the given root dataList for convenience + */ + public static <D extends Data> ListenableFuture<DataList<D>> recursiveComplete(DataList<D> rootDataList) { + List<ListenableFuture<DataList<D>>> futures = new ArrayList<>(); + collectCompletionFutures(rootDataList, futures); + return new CompleteAllOnGetFuture<D>(futures); + } + + @SuppressWarnings("unchecked") + private static <D extends Data> void collectCompletionFutures(DataList<D> dataList, List<ListenableFuture<DataList<D>>> futures) { + futures.add(dataList.complete()); + for (D data : dataList.asList()) { + if (data instanceof DataList) + collectCompletionFutures((DataList<D>) data, futures); + } + } + + /** + * A future which on get calls get on all its given futures and sets the value returned from the + * first given future as its result. + */ + private static class CompleteAllOnGetFuture<D extends Data> extends AbstractFuture<DataList<D>> { + + private final List<ListenableFuture<DataList<D>>> futures; + + public CompleteAllOnGetFuture(List<ListenableFuture<DataList<D>>> futures) { + this.futures = new ArrayList<>(futures); + } + + @Override + public DataList<D> get() throws InterruptedException, ExecutionException { + DataList<D> result = null; + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(); + else + future.get(); + } + set(result); + return result; + } + + @Override + public DataList<D> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + DataList<D> result = null; + long timeLeft = unit.toMillis(timeout); + long currentCallStart = SystemTimer.INSTANCE.milliTime(); + for (ListenableFuture<DataList<D>> future : futures) { + if (result == null) + result = future.get(timeLeft, TimeUnit.MILLISECONDS); + else + future.get(timeLeft, TimeUnit.MILLISECONDS); + long currentCallEnd = SystemTimer.INSTANCE.milliTime(); + timeLeft -= (currentCallEnd - currentCallStart); + if (timeLeft <= 0) break; + currentCallStart = currentCallEnd; + } + set(result); + return result; + } + + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java b/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java new file mode 100644 index 00000000000..2c40165f8e5 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/AsyncExecution.java @@ -0,0 +1,156 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.FutureResponse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.*; + +/** + * <p>Provides asynchronous execution of processing chains. Usage:</p> + * + * <pre> + * Execution execution = new Execution(chain); + * AsyncExecution asyncExecution = new AsyncExecution(execution); + * Future<Response> future = asyncExecution.process(request) + * try { + * result = future.get(timeout, TimeUnit.milliseconds); + * } catch(TimeoutException e) { + * // Handle timeout + * } + * </pre> + * + * <p> + * The request is not thread safe. A clone() must be made for each parallel processing. + * </p> + * + * @author bratseth + * @see Execution + */ +public class AsyncExecution { + + private static final ThreadFactory threadFactory = ThreadFactoryFactory.getThreadFactory("processing"); + + private static final Executor executorMain = createExecutor(); + private static final Executor createExecutor() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(100, + Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(false), threadFactory); + // Prestart needed, if not all threads will be created by the fist N tasks and hence they might also + // get the dreaded thread locals initialized even if they will never run. + // That counters what we we want to achieve with the Q that will prefer thread locality. + executor.prestartAllCoreThreads(); + return executor; + } + + /** + * The execution of this + */ + private final Execution execution; + + /** + * Create an async execution of a single processor + */ + public AsyncExecution(Processor processor, Execution parent) { + this(new Execution(processor, parent)); + } + + /** + * Create an async execution of a chain + */ + public AsyncExecution(Chain<? extends Processor> chain, Execution parent) { + this(new Execution(chain, parent)); + } + + /** + * Creates an async execution from an existing execution. This async + * execution will execute the chain from the given execution, starting + * from the next processor in that chain. This is handy to execute + * multiple executions to the rest of the chain in parallel. + * <p> + * The state of the given execution is read on construction of this and not + * used later - the argument execution can be reused for other purposes. + * + * @param execution the execution from which the state of this is created + */ + public AsyncExecution(Execution execution) { + this.execution = new Execution(execution); + } + + /** + * Performs an async processing. Note that the given request cannot be simultaneously + * used in multiple such processings - a clone must be created for each. + */ + public FutureResponse process(Request request) { + return getFutureResponse(new Callable<Response>() { + @Override + public Response call() { + return execution.process(request); + } + }, request); + } + + private static <T> Future<T> getFuture(final Callable<T> callable) { + FutureTask<T> future = new FutureTask<>(callable); + executorMain.execute(future); + return future; + } + + private FutureResponse getFutureResponse(Callable<Response> callable, Request request) { + FutureResponse future = new FutureResponse(callable, execution, request); + executorMain.execute(future.delegate()); + return future; + } + + /* + * Waits for all futures until the given timeout. If a FutureResult isn't + * done when the timeout expires, it will be cancelled, and it will return a + * response. All unfinished Futures will be cancelled. + * + * @return the list of responses in the same order as returned from the task collection + */ + // Note that this may also be achieved using guava Futures. Not sure if this should be deprecated because of it. + public static List<Response> waitForAll(Collection<FutureResponse> tasks, long timeout) { + + // Copy the list in case it is modified while we are waiting + List<FutureResponse> workingTasks = new ArrayList<>(tasks); + @SuppressWarnings({"rawtypes", "unchecked"}) + Future task = getFuture(new Callable() { + @Override + public List<Future> call() { + for (FutureResponse task : workingTasks) { + task.get(); + } + return null; + } + }); + + try { + task.get(timeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException | InterruptedException | ExecutionException e) { + // Handle timeouts below + } + + List<Response> responses = new ArrayList<>(tasks.size()); + for (FutureResponse future : workingTasks) + responses.add(getTaskResponse(future)); + return responses; + } + + private static Response getTaskResponse(FutureResponse future) { + if (future.isDone() && !future.isCancelled()) { + return future.get(); // Since isDone() = true, this won't block. + } else { // Not done and no errors thrown + return new Response(future.getRequest(), new ErrorMessage("Timed out waiting for " + future)); + } + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/Execution.java b/container-core/src/main/java/com/yahoo/processing/execution/Execution.java new file mode 100644 index 00000000000..98bc3485084 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/Execution.java @@ -0,0 +1,487 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.collections.Pair; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.chain.ChainRegistry; +import com.yahoo.yolean.trace.TraceNode; +import com.yahoo.yolean.trace.TraceVisitor; + +import java.util.Iterator; + +/** + * An execution of a chain. This keeps tracks of the progress of the execution and is called by the + * processors (using {@link #process} to move the execution to the next one. + * + * @author bratseth + */ +public class Execution { + + /** + * The index of the searcher in the chain which should be executed on the next call + * An Execution instance contains the state of a chain execution by + * providing a class stack for a chain - when a processor is called + * (through this), it will increment the index of the processor to call next, + * each time a processor returns (regardless of how) it will do the opposite. + */ + private int processorIndex; + + private final Chain<? extends Processor> chain; + + private final Trace trace; + + private final Environment<? extends Processor> environment; + + /** + * Creates an execution of a single processor + * + * @param processor the processor to execute in this + * @param execution the parent execution of this + */ + public Execution(Processor processor, Execution execution) { + this(new Chain<>(processor), execution); + } + + /** Creates an execution of a single processor which is not in the context of an existing execution */ + public static Execution createRoot(Processor processor, int traceLevel, Environment<? extends Processor> environment) { + return createRoot(new Chain<>(processor), traceLevel, environment); + } + + /** + * Creates an execution which is not in the context of an existing execution + * + * @param chain the chain to execute + * @param traceLevel the level to emit trace at + * @param environment the execution environment to use + */ + public static Execution createRoot(Chain<? extends Processor> chain, int traceLevel, + Environment<? extends Processor> environment) { + return new Execution(chain, 0, Trace.createRoot(traceLevel), environment); + } + + /** + * Creates an execution of a chain + * + * @param chain the chain to execute in this, starting from the first processor + * @param execution the parent execution of this + */ + public Execution(Chain<? extends Processor> chain, Execution execution) { + this(chain, 0, execution.trace().createChild(), execution.environment().nested()); + } + + /** + * Creates an execution from another. This execution will start at the next processor of the + * given execution. The given execution can continue independently of this. + */ + public Execution(Execution startPoint) { + this(startPoint.chain, startPoint.processorIndex, startPoint.trace.createChild(), startPoint.environment().nested()); + } + + /** + * Creates a new execution by setting the internal state directly. + * + * @param chain the chain to execute + * @param startIndex the start index into that chain + * @param trace the context <b>of this</b>. If this is created from an existing execution/context, + * be sure to do <code> new Context<COMPONENT>(startPoint.context)</code> first! + * @param environment the static execution environment to use + */ + protected Execution(Chain<? extends Processor> chain, int startIndex, Trace trace, Environment<? extends Processor> environment) { + if (chain == null) throw new NullPointerException("Chain cannot be null"); + this.chain = chain; + this.processorIndex = startIndex; + this.trace = trace; + this.environment = environment; + } + + /** + * Calls process on the next processor in this chain. If there is no next, an empty response is returned. + */ + public Response process(Request request) { + Processor processor = next(); + if (processor == null) + return defaultResponse(request); + + Response response = null; + try { + nextProcessor(); + onInvoking(request, processor); + response = processor.process(request, this); + if (response == null) + throw new NullPointerException(processor + " returned null, not a Response object"); + return response; + } finally { + previousProcessor(); + onReturning(request, processor, response); + } + } + + /** + * Returns the index into the chain of processors which is currently next + */ + protected int nextIndex() { + return processorIndex; + } + + /** + * A hook called when a processor is to be invoked. Overriding methods must call super.onInvoking + */ + protected void onInvoking(Request request, Processor next) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Invoke " + next, traceAt); + } + if (Trace.Level.Dependencies.includes(trace.getTraceLevel())) + trace.trace(next.getId() + " " + next.getDependencies().toString(), trace.getTraceLevel()); + } + + /** + * A hook called when a processor returns, either normally or by throwing. + * Overriding methods must call super.onReturning + * + * @param request the processing request + * @param processor the processor which returned + * @param response the response returned, or null if the processor returned by throwing + */ + protected void onReturning(Request request, Processor processor, Response response) { + if (Trace.Level.Step.includes(trace.getTraceLevel()) || trace.getForceTimestamps()) { + int traceAt = trace.getForceTimestamps() ? 1 : trace.getTraceLevel(); + trace.trace("Return " + processor, traceAt); + } + } + + /** Move this execution to the previous processor */ + protected void previousProcessor() { + processorIndex--; + } + + /** Move this execution to the next processor */ + protected void nextProcessor() { + processorIndex++; + } + + /** Returns the next searcher to be invoked in this chain, or null if we are at the last */ + protected Processor next() { + if (chain.components().size() <= processorIndex) return null; + return chain.components().get(processorIndex); + } + + /** Returns the chain this executes */ + public Chain<? extends Processor> chain() { + return chain; + } + + /** + * Creates the default response to return from this kind of execution when there are no further processors. + * If this is overridden, make sure to propagate any freezeListener from this to the returned response + * top-level DataList. + */ + protected Response defaultResponse(Request request) { + return new Response(request); + } + + public String toString() { + return "execution of chain '" + chain.getId() + "'"; + } + + public Trace trace() { + return trace; + } + + public Environment<? extends Processor> environment() { + return environment; + } + + /** + * Holds the static execution environment for the duration of an execution + */ + public static class Environment<COMPONENT extends Processor> { + + private final ChainRegistry<COMPONENT> chainRegistry; + + /** + * Creates an empty environment. Only useful for some limited testing + */ + public static <C extends Processor> Environment<C> createEmpty() { + return new Environment<>(new ChainRegistry<>()); + } + + /** + * Returns an environment for an execution spawned from the execution having this environment. + */ + public Environment<COMPONENT> nested() { + return this; // this is immutable, subclasses might want to do something else though + } + + /** + * Creates a new environment + */ + public Environment(ChainRegistry<COMPONENT> chainRegistry) { + this.chainRegistry = chainRegistry; + } + + /** + * Returns the processing chain registry of this execution environment. + * The registry may be empty, but never null. + */ + public ChainRegistry<COMPONENT> chainRegistry() { + return chainRegistry; + } + + } + + /** + * Tre trace of this execution. This is a facade into a node in the larger trace tree which captures + * the information about all executions caused by some request + * + * @author bratseth + */ + public static class Trace { + + /** + * The node in the trace tree capturing this execution + */ + private final TraceNode traceNode; + + /** + * The highest level of tracing this should record + */ + private int traceLevel; + + /** + * If true, do timing logic, even though trace level is low. + */ + private boolean forceTimestamps; + + /** + * Creates an empty root trace with a given level of tracing + */ + public static Trace createRoot(int traceLevel) { + return new Trace(traceLevel, new TraceNode(null, timestamp(traceLevel, false)), false); + } + + /** + * Creates a trace node below a parent + */ + public Trace createChild() { + TraceNode child = new TraceNode(null, timestamp(traceLevel, forceTimestamps)); + traceNode.add(child); + return new Trace(getTraceLevel(), child, forceTimestamps); + } + + /** + * Creates a new instance by assigning the internal state of this directly + */ + private Trace(int traceLevel, TraceNode traceNode, boolean forceTimestamps) { + this.traceLevel = traceLevel; + this.traceNode = traceNode; + this.forceTimestamps = forceTimestamps; + } + + /** + * Returns the maximum trace level this will record + */ + public int getTraceLevel() { + return traceLevel; + } + + /** + * Sets the maximum trace level this will record + */ + public void setTraceLevel(int traceLevel) { + this.traceLevel = traceLevel; + } + + public void setForceTimestamps(boolean forceTimestamps) { + this.forceTimestamps = forceTimestamps; + } + + public boolean getForceTimestamps() { + return forceTimestamps; + } + + /** + * Adds a trace message to this trace, if this trace has at most the given trace level + */ + public void trace(String message, int traceLevel) { + trace((Object)message, traceLevel); + } + public void trace(Object message, int traceLevel) { + if (this.traceLevel >= traceLevel) { + traceNode.add(new TraceNode(message, timestamp(traceLevel, forceTimestamps))); + } + } + + /** + * Adds a key-value which will be logged to the access log of this request. + * Multiple values may be set to the same key. A value cannot be removed once set, + * but it can be overwritten by adding another value for the same key. + */ + public void logValue(String key, String value) { + traceNode.add(new TraceNode(new LogValue(key, value), 0)); + } + + /** + * Returns the values that should be written to the access log set in the entire trace node tree + */ + public Iterator<LogValue> logValueIterator() { + return traceNode.root().descendants(LogValue.class).iterator(); + } + + /** + * Visits the entire trace tree + * + * @return the argument visitor for convenience + */ + public <VISITOR extends TraceVisitor> VISITOR accept(VISITOR visitor) { + return traceNode.root().accept(visitor); + } + + /** + * Adds a property key-value to this trace. + * Values are looked up by reverse depth-first search in the trace node tree. + * + * @param name the name of the property + * @param value the value of the property, or null to set this property to null + */ + public void setProperty(String name, Object value) { + traceNode.add(new TraceNode(new Pair<>(name, value), 0)); + } + + /** + * Returns a property set anywhere in the trace tree this points to. + * Note that even though this call is itself "thread robust", the object values returned + * may in some scenarios not be written behind a synchronization barrier, so when accessing + * objects which are not inherently thread safe, synchronization should be considered. + * <p> + * This method have a time complexity which is proportional to + * the number of trace nodes in the tree + * + * @return the value of this property, or null if none + */ + public Object getProperty(String name) { + return accept(new PropertyValueVisitor(name)).foundValue(); + } + + /** + * Returns the trace node peer of this + */ + public TraceNode traceNode() { + return traceNode; + } + + /** + * Returns a short string description of this + */ + @Override + public String toString() { + return "trace: " + traceNode; + } + + private static long timestamp(int traceLevel, boolean forceTimestamps) { + return (forceTimestamps || Level.Timestamp.includes(traceLevel)) ? System.currentTimeMillis() : 0; + } + + /** + * Visits all trace nodes to collect the last set value of a particular property in a trace tree + */ + private static class PropertyValueVisitor extends TraceVisitor { + + /** + * The name of the property to find + */ + private String name; + private Object foundValue = null; + + public PropertyValueVisitor(String name) { + this.name = name; + } + + @Override + public void visit(TraceNode node) { + if (node.payload() == null) return; + if (!(node.payload() instanceof Pair)) return; + + Pair property = (Pair) node.payload(); + if (!property.getFirst().equals(name)) return; + foundValue = property.getSecond(); + } + + public Object foundValue() { + return foundValue; + } + + } + + /** + * An immutable access log value added to the trace + */ + public static class LogValue { + + private final String key; + private final String value; + + public LogValue(String key, String value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } + + @Override + public String toString() { + return key + "=" + value; + } + + } + + /** + * Defines what information is added at which trace level + */ + public enum Level { + + /** + * Every processing step initiated is traced + */ + Step(4), + /** + * All trace messages are timestamped + */ + Timestamp(6), + /** + * The before/after dependencies of each processing step is traced on every invocation + */ + Dependencies(7); + + /** + * The smallest trace level at which this information will be traced + */ + private int value; + + Level(int value) { + this.value = value; + } + + public int value() { + return value; + } + + /** + * Returns whether this level includes the given level, i.e whether traceLevel is this.value() or more + */ + public boolean includes(int traceLevel) { + return traceLevel >= this.value(); + } + + } + } +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java b/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java new file mode 100644 index 00000000000..a95771c1202 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/ExecutionWithResponse.java @@ -0,0 +1,36 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An execution which has a response which is returned when this gets to the end of the chain. + * This is useful to run processing chains where a response exists up front, typically for on completion listeners. + * + * @author bratseth + */ +public class ExecutionWithResponse extends Execution { + + private Response response; + + /** + * Creates an execution which will return a given response at the end of the chain. + * + * @param chain the chain to execute in this + * @param response the response this will return from {@link #process} then the end of this chain is reached + * @param execution the the parent of this execution + */ + public ExecutionWithResponse(Chain<? extends Processor> chain, Response response, Execution execution) { + super(chain, execution); + this.response = response; + } + + @Override + protected Response defaultResponse(Request request) { + return response; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java b/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java new file mode 100644 index 00000000000..053459166e6 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/ResponseReceiver.java @@ -0,0 +1,17 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Response; + +/** + * An interface for classes which can be given responses. + * Freeze listeners may implement this to be handed the response + * before they are run. There is probably no other sensible use for this. + * + * @author bratseth + */ +public interface ResponseReceiver { + + public void setResponse(Response response); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java b/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java new file mode 100644 index 00000000000..e69cb8e48cd --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/RunnableExecution.java @@ -0,0 +1,52 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution; + +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; + +/** + * An adaptor of an Execution to a runnable. Calling run on this causes process to be called on the + * given processor. + * + * @author bratseth + */ +public class RunnableExecution implements Runnable { + + private final Request request; + private final Execution execution; + private Response response = null; + private Throwable exception = null; + + public RunnableExecution(Request request, Execution execution) { + this.request = request; + this.execution = execution; + } + + /** + * Calls process on the execution of this. + * This will result in either response or exception being set on this. + * Calling this never throws an exception. + */ + public void run() { + try { + response = execution.process(request); + } catch (Exception e) { + exception = e; // TODO: Log + } + } + + /** + * Returns the response from executing this, or null if exception is set or run() has not been called yet + */ + public Response getResponse() { + return response; + } + + /** + * Returns the exception from executing this, or null if response is set or run() has not been called yet + */ + public Throwable getException() { + return exception; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java b/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java new file mode 100644 index 00000000000..dbf31de1b72 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/chain/ChainRegistry.java @@ -0,0 +1,14 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.execution.chain; + +import com.yahoo.component.chain.Chain; +import com.yahoo.component.chain.ChainedComponent; +import com.yahoo.component.provider.ComponentRegistry; + +/** + * A registry of chains + * + * @author Tony Vaagenes + */ +public class ChainRegistry<T extends ChainedComponent> extends ComponentRegistry<Chain<T>> { +} diff --git a/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java b/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java new file mode 100644 index 00000000000..6d82609bc76 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/chain/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution.chain; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/execution/package-info.java b/container-core/src/main/java/com/yahoo/processing/execution/package-info.java new file mode 100644 index 00000000000..6430aa32ae3 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/execution/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.execution; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/package-info.java b/container-core/src/main/java/com/yahoo/processing/package-info.java new file mode 100644 index 00000000000..b39272d881a --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/package-info.java @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Java library for request-response data processing. + * + * This library defines request-response processing as an operation which + * accepts a Request and produces a Response containing Data by executing + * a Chain of processing components in a single worker thread using method + * calls for chaining, i.e a synchronous processing model. + * Data for the Response may optionally be produced asynchronously. + * + * The processing model can be implemented by subtyping in frameworks defining + * a processing model (with a richer, more specific API) for a particular domain. + */ +@ExportPackage +@PublicApi package com.yahoo.processing; + +// TODO: +// - Look through all instances where we pass executor and consider if we should allow the caller to decide the thread +// - Should data listener use a typed interface rather than runnable` + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/request/CloneHelper.java b/container-core/src/main/java/com/yahoo/processing/request/CloneHelper.java new file mode 100644 index 00000000000..837ff3db295 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/CloneHelper.java @@ -0,0 +1,124 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import com.yahoo.collections.MethodCache; +import com.yahoo.component.provider.FreezableClass; +import com.yahoo.processing.request.properties.PublicCloneable; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.logging.Logger; +import java.util.LinkedList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.HashMap; + +/** + * Helps to deep clone complex objects + * The following classes and their subclasses does have a fastpath + * - com.yahoo.component.provider.FreezableClass + * - com.yahoo.processing.request.properties.PublicCloneable BTW, this is the one you should implement too + * if you want the fastpath. + * - java.util.LinkedList + * - java.util.ArrayList + * The rest has the slow path with reflection, + * though using a fast thread safe method cache for speedup. + * + * @author bratseth + * @author baldersheim + */ +public class CloneHelper { + + private static Logger log = Logger.getLogger(CloneHelper.class.getName()); + private static final MethodCache cloneMethodCache = new MethodCache("clone"); + + /** + * Clones this object if it is clonable, and the clone is public. Returns null if not + */ + public final Object clone(Object object) { + if (object == null) return null; + if ( ! (object instanceof Cloneable)) return null; + if (object.getClass().isArray()) + return arrayClone(object); + else + return objectClone(object); + } + + private Object arrayClone(Object array) { + if (array instanceof Object[]) + return objectArrayClone((Object[]) array); + else if (array instanceof byte[]) + return Arrays.copyOf((byte[])array, ((byte[])array).length); + else if (array instanceof char[]) + return Arrays.copyOf((char[])array, ((char[])array).length); + else if (array instanceof short[]) + return Arrays.copyOf((short[])array, ((short[])array).length); + else if (array instanceof int[]) + return Arrays.copyOf((int[])array, ((int[])array).length); + else if (array instanceof long[]) + return Arrays.copyOf((long[])array, ((long[])array).length); + else if (array instanceof float[]) + return Arrays.copyOf((float[])array, ((float[])array).length); + else if (array instanceof double[]) + return Arrays.copyOf((double[])array, ((double[])array).length); + else if (array instanceof boolean[]) + return Arrays.copyOf((boolean[])array, ((boolean[])array).length); + else + return new IllegalArgumentException("Unexpected primitive array type " + array.getClass()); + } + + private Object objectArrayClone(Object[] object) { + Object[] arrayClone = Arrays.copyOf(object, object.length); + // deep clone + for (int i = 0; i < arrayClone.length; i++) { + Object elementClone = clone(arrayClone[i]); + if (elementClone != null) + arrayClone[i] = elementClone; + } + return arrayClone; + } + + protected Object objectClone(Object object) { + // Fastpath for our commonly used classes + if (object instanceof FreezableClass) + return ((FreezableClass)object).clone(); + else if (object instanceof PublicCloneable) + return ((PublicCloneable<?>)object).clone(); + else if (object instanceof LinkedList) + return ((LinkedList<?>) object).clone(); + else if (object instanceof ArrayList) + return ((ArrayList<?>) object).clone(); + + try { + Method cloneMethod = cloneMethodCache.get(object); + if (cloneMethod == null) { + log.warning("'" + object + "' of class " + object.getClass() + + " is Cloneable, but has no clone method - will use the same instance in all requests"); + return null; + } + return cloneMethod.invoke(object); + } catch (IllegalAccessException e) { + log.warning("'" + object + "' of class " + object.getClass() + + " is Cloneable, but clone method cannot be accessed - will use the same instance in all requests"); + return null; + } catch (InvocationTargetException e) { + throw new RuntimeException("Exception cloning '" + object + "'", e); + } + } + + /** + * Clones a map by deep cloning each value which is cloneable and shallow copying all other values. + */ + public Map<CompoundName, Object> cloneMap(Map<CompoundName, Object> map) { + Map<CompoundName, Object> cloneMap = new HashMap<>(map.size()); + for (Map.Entry<CompoundName, Object> entry : map.entrySet()) { + Object cloneValue = clone(entry.getValue()); + if (cloneValue == null) + cloneValue = entry.getValue(); // Shallow copy objects which does not support cloning + cloneMap.put(entry.getKey(), cloneValue); + } + return cloneMap; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/CompoundName.java b/container-core/src/main/java/com/yahoo/processing/request/CompoundName.java new file mode 100644 index 00000000000..432c7473c2b --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/CompoundName.java @@ -0,0 +1,288 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static com.yahoo.text.Lowercase.toLowerCase; + +/** + * An immutable compound name of the general form "a.bb.ccc", + * where there can be any number of such compounds, including one or zero. + * <p> + * Using CompoundName is generally substantially faster than using strings. + * + * @author bratseth + */ +public final class CompoundName { + + /** + * The string name of this compound. + */ + private final String name; + private final String lowerCasedName; + + private final ImmutableList<String> compounds; + + /** A hashcode which is always derived from the compounds (NEVER the string) */ + private final int hashCode; + + /** This name with the first component removed */ + private final CompoundName rest; + + /** The empty compound */ + public static final CompoundName empty = new CompoundName(""); + + /** + * Constructs this from a string which may contains dot-separated components + * + * @throws NullPointerException if name is null + */ + public CompoundName(String name) { + this(name, parse(name)); + } + + /** Constructs this from an array of name components which are assumed not to contain dots */ + public static CompoundName fromComponents(String ... components) { + return new CompoundName(Arrays.asList(components)); + } + + /** Constructs this from a list of compounds. */ + public CompoundName(List<String> compounds) { + this(toCompoundString(compounds), compounds); + } + + /** + * Constructs this from a name with already parsed compounds. + * Private to avoid creating names with inconsistencies. + * + * @param name the string representation of the compounds + * @param compounds the compounds of this name + */ + private CompoundName(String name, List<String> compounds) { + if (name == null) throw new NullPointerException("Name can not be null"); + + this.name = name; + this.lowerCasedName = toLowerCase(name); + if (compounds.size()==1 && compounds.get(0).isEmpty()) + this.compounds = ImmutableList.of(); + else + this.compounds = ImmutableList.copyOf(compounds); + this.hashCode = this.compounds.hashCode(); + + int size = this.compounds.size(); + rest = size > 1 ? new CompoundName(compounds.subList(1, size)) + : size == 1 ? empty : this; // size==0 -> this needed during construction of empty + } + + private static List<String> parse(String s) { + ArrayList<String> l = null; + + int p = 0; + final int m = s.length(); + for (int i = 0; i < m; i++) { + if (s.charAt(i) == '.') { + if (l == null) l = new ArrayList<>(8); + l.add(s.substring(p, i)); + p = i + 1; + } + } + if (p == 0) { + return ImmutableList.of(s); + } else if (p < m) { + l.add(s.substring(p, m)); + } else { + throw new IllegalArgumentException("'" + s + "' is not a legal compound name. Names can not end with a dot."); + } + return l; + } + + /** + * Returns a compound name which has the given compound string appended to it + * + * @param name if name is empty this returns <code>this</code> + */ + public CompoundName append(String name) { + if (name.isEmpty()) return this; + if (isEmpty()) return new CompoundName(name); + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.addAll(parse(name)); + return new CompoundName(concat(this.name, name), newCompounds); + } + + /** + * Returns a compound name which has the given compounds appended to it + * + * @param name if name is empty this returns <code>this</code> + */ + public CompoundName append(CompoundName name) { + if (name.isEmpty()) return this; + if (isEmpty()) return name; + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.addAll(name.compounds); + return new CompoundName(concat(this.name, name.name), newCompounds); + } + + private String concat(String name1, String name2) { + return name1 + "." + name2; + } + + /** + * Returns a compound name which has the given name components prepended to this name, + * in the given order, i.e new ComponentName("c").prepend("a","b") will yield "a.b.c". + * + * @param nameParts if name is empty this returns <code>this</code> + */ + public CompoundName prepend(String ... nameParts) { + if (nameParts.length == 0) return this; + if (isEmpty()) return fromComponents(nameParts); + + List<String> newCompounds = new ArrayList<>(nameParts.length + compounds.size()); + newCompounds.addAll(Arrays.asList(nameParts)); + newCompounds.addAll(this.compounds); + return new CompoundName(newCompounds); + } + + /** + * Returns the name after the last dot. If there are no dots, the full name is returned. + */ + public String last() { + if (compounds.isEmpty()) return ""; + return compounds.get(compounds.size() - 1); + } + + /** + * Returns the name before the first dot. If there are no dots the full name is returned. + */ + public String first() { + if (compounds.isEmpty()) return ""; + return compounds.get(0); + } + + /** + * Returns the first n components of this. + * + * @throws IllegalArgumentException if this does not have at least n components + */ + public CompoundName first(int n) { + if (compounds.size() < n) + throw new IllegalArgumentException("Asked for the first " + n + " components but '" + + this + "' only have " + compounds.size() + " components."); + return new CompoundName(compounds.subList(0, n)); + } + + /** + * Returns the name after the first dot, or "" if this name has no dots + */ + public CompoundName rest() { return rest; } + + /** + * Returns the name starting after the n first components (i.e dots). + * This may be the empty name. + * + * @throws IllegalArgumentException if this does not have at least that many components + */ + public CompoundName rest(int n) { + if (n == 0) return this; + if (compounds.size() < n) + throw new IllegalArgumentException("Asked for the rest after " + n + " components but '" + + this + "' only have " + compounds.size() + " components."); + if (n == 1) return rest(); + if (compounds.size() == n) return empty; + return rest.rest(n - 1); + } + + /** + * Returns the number of compound elements in this. Which is exactly the number of dots in the string plus one. + * The size of an empty compound is 0. + */ + public int size() { + return compounds.size(); + } + + /** + * Returns the compound element as the given index + */ + public String get(int i) { + return compounds.get(i); + } + + /** + * Returns a compound which have the name component at index i set to the given name. + * As an optimization, if the given name == the name component at this index, this is returned. + */ + public CompoundName set(int i, String name) { + if (get(i) == name) return this; + List<String> newCompounds = new ArrayList<>(compounds); + newCompounds.set(i, name); + return new CompoundName(newCompounds); + } + + /** + * Returns whether this name has more than one element + */ + public boolean isCompound() { + return compounds.size() > 1; + } + + public boolean isEmpty() { + return compounds.isEmpty(); + } + + /** + * Returns whether the given name is a prefix of this. + * Prefixes are taken on the component, not character level, so + * "a" is a prefix of "a.b", but not a prefix of "ax.b + */ + public boolean hasPrefix(CompoundName prefix) { + if (prefix.size() > this.size()) return false; + + int prefixLength = prefix.name.length(); + if (prefixLength == 0) + return true; + + if (name.length() > prefixLength && name.charAt(prefixLength) != '.') + return false; + + return name.startsWith(prefix.name); + } + + /** + * Returns an immutable list of the components of this + */ + public List<String> asList() { + return compounds; + } + + @Override + public int hashCode() { return hashCode; } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof CompoundName)) return false; + CompoundName other = (CompoundName)o; + return this.name.equals(other.name); + } + + /** + * Returns the string representation of this - all the name components in order separated by dots. + */ + @Override + public String toString() { return name; } + + public String getLowerCasedName() { + return lowerCasedName; + } + + private static String toCompoundString(List<String> compounds) { + StringBuilder b = new StringBuilder(); + for (String compound : compounds) + b.append(compound).append("."); + return b.length()==0 ? "" : b.substring(0, b.length()-1); + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/ErrorMessage.java b/container-core/src/main/java/com/yahoo/processing/request/ErrorMessage.java new file mode 100644 index 00000000000..0ced664bfdc --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/ErrorMessage.java @@ -0,0 +1,217 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +/** + * An error encountered while processing a request. + * This can be subclassed to add error messages containing more information. + * <p> + * Error messages are immutable. + * + * @author bratseth + */ +public class ErrorMessage implements Cloneable { + + private final int code; + private final String message; + private final String detailedMessage; + private final Throwable cause; + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + */ + public ErrorMessage(String message) { + this(0, message, null, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + */ + public ErrorMessage(int code, String message) { + this(code, message, null, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + */ + public ErrorMessage(String message, String details) { + this(0, message, details, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param details a longer detail description of this condition + */ + public ErrorMessage(int code, String message, String details) { + this(code, message, details, null); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param cause the cause of this error + */ + public ErrorMessage(String message, Throwable cause) { + this(0, message, null, cause); + } + + /** + * Creates an error + * + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param message the textual message describing this condition tersely + * @param cause the cause of this error + */ + public ErrorMessage(int code, String message, Throwable cause) { + this(code, message, null, cause); + } + + /** + * Creates an error + * + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + * @param cause the cause of this error + */ + public ErrorMessage(String message, String details, Throwable cause) { + this(0, message, details, cause); + } + + /** + * Creates an error + * + * @param code an error code. If this is bound to HTTP request/responses and + * this error code is a HTTP status code, this code will be returned as the HTTP status + * @param message the textual message describing this condition tersely + * @param details a longer detail description of this condition + * @param cause the cause of this error + */ + public ErrorMessage(int code, String message, String details, Throwable cause) { + if (message == null) throw new NullPointerException("Message cannot be null"); + this.code = code; + this.message = message; + this.detailedMessage = details; + this.cause = cause; + } + + /** + * Returns the code of this message, or 0 if no code is set + */ + public int getCode() { + return code; + } + + /** + * Returns the error message, never null + */ + public String getMessage() { + return message; + } + + /** + * Returns detailed information about this error, or null if there is no detailed message + */ + public String getDetailedMessage() { + return detailedMessage; + } + + /** + * Returns the throwable associated with this error, or null if none + */ + public Throwable getCause() { + return cause; + } + + /** + * Returns a formatted message containing the information in this + */ + @Override + public String toString() { + if (code == 0 && detailedMessage == null && cause == null) return message; // shortcut + StringBuilder b = new StringBuilder(); + if (code != 0) + b.append(code).append(": "); + b.append(message); + if (detailedMessage != null) + b.append(": ").append(detailedMessage); + if (cause != null) + append(cause, b); + return b.toString(); + } + + private void append(Throwable t, StringBuilder b) { + String lastMessage = null; + String message; + for (; t != null; t = t.getCause(), lastMessage = message) { + message = getMessage(t); + if (message == null) continue; + if (lastMessage != null && lastMessage.equals(message)) continue; + if (b.length() > 0) + b.append(": "); + b.append(message); + } + } + + /** + * Returns a useful message from *this* exception, or null if none + */ + private static String getMessage(Throwable t) { + String message = t.getMessage(); + if (t.getCause() == null) { + if (message == null) return t.getClass().getSimpleName(); + } else { + if (message == null) return null; + if (message.equals(t.getCause().getClass().getName() + ": " + t.getCause().getMessage())) return null; + } + return message; + } + + @Override + public int hashCode() { + return code * 7 + message.hashCode() + (detailedMessage == null ? 0 : 17 * detailedMessage.hashCode()); + } + + /** + * Two error messages are equal if they have the same code and message. + * The cause is ignored in the comparison. + */ + @Override + public boolean equals(Object o) { + if (!(o instanceof ErrorMessage)) return false; + + ErrorMessage other = (ErrorMessage) o; + + if (this.code != other.code) return false; + + if (!this.message.equals(other.message)) return false; + + if (this.detailedMessage == null) return other.detailedMessage == null; + if (other.detailedMessage == null) return false; + + return this.detailedMessage.equals(other.detailedMessage); + } + + @Override + public ErrorMessage clone() { + try { + return (ErrorMessage) super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Programming error"); + } + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/Properties.java b/container-core/src/main/java/com/yahoo/processing/request/Properties.java new file mode 100644 index 00000000000..9362de59203 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/Properties.java @@ -0,0 +1,634 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request; + +import java.util.Map; +import java.util.HashMap; +import java.util.Collections; + +/** + * The properties of a request + * + * @author bratseth + */ +public class Properties implements Cloneable { + + private final static CloneHelper cloneHelper = new CloneHelper(); + private Properties chained = null; + + /** + * Sets the properties chained to this. + * + * @param chained the properties to chain to this, or null to make this the last in the chain + * @return the given chained object to allow setting up a chain by dotting in one statement + */ + public Properties chain(Properties chained) { + this.chained = chained; + return chained; + } + + /** + * Returns the properties chained to this, or null if this is the last in the chain + */ + public Properties chained() { + return chained; + } + + /** + * Returns the first instance of the given class in this chain, or null if none + */ + @SuppressWarnings("unchecked") + public final <T extends Properties> T getInstance(Class<T> propertyClass) { + if (propertyClass.isAssignableFrom(this.getClass())) return (T) this; + if (chained == null) return null; + return chained.getInstance(propertyClass); + } + + /** + * Lists all properties of this with no context, by delegating to listProperties("") + */ + public final Map<String, Object> listProperties() { + return listProperties(CompoundName.empty); + } + + /** + * Returns a snapshot of all properties of this - same as listProperties("",context) + */ + public final Map<String, Object> listProperties(Map<String, String> context) { + return listProperties(CompoundName.empty, context, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(CompoundName path) { + return listProperties(path, null, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(String path) { + return listProperties(new CompoundName(path), null, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(CompoundName path, Map<String, String> context) { + return listProperties(path, context, this); + } + + /** + * Returns a snapshot of all properties by calling listProperties(path,null) + */ + public final Map<String, Object> listProperties(String path, Map<String, String> context) { + return listProperties(new CompoundName(path), context, this); + } + + /** + * Returns a snapshot of all properties of this having a given path prefix + * <p> + * Some sources of properties may not be list-able (e.g those using reflection) + * and will not be included in this snapshot. + * + * + * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties + * @param context the context used to resolve the properties, or null if none + * @param substitution the properties which will be used to do string substitution in the values added to the map + */ + public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) { + if (path == null) + path = CompoundName.empty; + if (chained() == null) + return new HashMap<>(); + else + return chained().listProperties(path, context, substitution); + } + + /** + * Returns a snapshot of all properties of this having a given path prefix + * <p> + * Some sources of properties may not be list-able (e.g those using reflection) + * and will not be included in this snapshot. + * + * + * @param path the prefix (up to a ".") of the properties to return, or null or the empty string to return all properties + * @param context the context used to resolve the properties, or null if none + * @param substitution the properties which will be used to do string substitution in the values added to the map + */ + public final Map<String, Object> listProperties(String path, Map<String, String> context, Properties substitution) { + return listProperties(new CompoundName(path), context, substitution); + } + + /** + * Gets a named value which (if necessary) is resolved using a property context. + * + * @param name the name of the property to return + * @param context the variant resolution context, or null if none + * @param substitution the properties used to substitute in these properties, or null if none + */ + public Object get(CompoundName name, Map<String, String> context, Properties substitution) { + if (chained == null) return null; + return chained.get(name, context, substitution); + } + + /** + * Gets a named value which (if necessary) is resolved using a property context + * + * @param name the name of the property to return + * @param context the variant resolution context, or null if none + * @param substitution the properties used to substitute in these properties, or null if none + */ + public final Object get(String name, Map<String, String> context, Properties substitution) { + return get(new CompoundName(name), context, substitution); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,context,this) + */ + public final Object get(CompoundName name, Map<String, String> context) { + return get(name, context, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,context,this) + */ + public final Object get(String name, Map<String, String> context) { + return get(new CompoundName(name), context, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,null,this) + */ + public final Object get(CompoundName name) { + return get(name, null, this); + } + + /** + * Gets a named value from the first chained instance which has one by calling get(name,null,this) + */ + public final Object get(String name) { + return get(new CompoundName(name), null, this); + } + + /** + * Gets a named value from the first chained instance which has one, + * or the default value if no value is set, or if the first value encountered is explicitly set to null. + * <p> + * This default implementation simply forwards to the chained instance, or returns the default if none + * + * + * @param name the name of the property to return + * @param defaultValue the default value returned if the value returned is null + */ + public final Object get(CompoundName name, Object defaultValue) { + Object value = get(name); + if (value == null) return defaultValue; + return value; + } + + /** + * Gets a named value from the first chained instance which has one, + * or the default value if no value is set, or if the first value encountered is explicitly set to null. + * <p> + * This default implementation simply forwards to the chained instance, or returns the default if none + * + * @param name the name of the property to return + * @param defaultValue the default value returned if the value returned is null + */ + public final Object get(String name, Object defaultValue) { + return get(new CompoundName(name), defaultValue); + } + + /** + * Sets a value to the first chained instance which accepts it. + * <p> + * This default implementation forwards to the chained instance or throws + * a RuntimeException if there is not chained instance. + * + * @param name the name of the property + * @param value the value to set. Setting a property to null clears it. + * @param context the context used to resolve where the values should be set, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public void set(CompoundName name, Object value, Map<String, String> context) { + if (chained == null) throw new RuntimeException("Property '" + name + "->" + + value + "' was not accepted in this property chain"); + chained.set(name, value, context); + } + + /** + * Sets a value to the first chained instance which accepts it. + * <p> + * This default implementation forwards to the chained instance or throws + * a RuntimeException if there is not chained instance. + * + * @param name the name of the property + * @param value the value to set. Setting a property to null clears it. + * @param context the context used to resolve where the values should be set, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(String name, Object value, Map<String, String> context) { + set(new CompoundName(name), value, context); + } + + /** + * Sets a value to the first chained instance which accepts it by calling set(name,value,null). + * + * @param name the name of the property + * @param value the value to set. Setting a property to null clears it. + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(CompoundName name, Object value) { + set(name, value, null); + } + + /** + * Sets a value to the first chained instance which accepts it by calling set(name,value,null). + * + * @param name the name of the property + * @param value the value to set. Setting a property to null clears it. + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void set(String name, Object value) { + set(new CompoundName(name), value, Collections.<String,String>emptyMap()); + } + + /** + * Sets all properties having this name as a compound prefix to null. + * I.e clearAll("a") will clear the value of "a" and "a.b" but not "ab". + * This default implementation forwards to the chained instance or throws + * a RuntimeException if there is not chained instance. + * + * @param name the compound prefix of the properties to clear + * @param context the context used to resolve where the values should be cleared, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public void clearAll(CompoundName name, Map<String, String> context) { + if (chained == null) throw new RuntimeException("Property '" + name + + "' was not accepted in this property chain"); + chained.clearAll(name, context); + } + + /** + * Sets all properties having this name as a compound prefix to null. + * I.e clearAll("a") will clear the value of "a" and "a.b" but not "ab". + * + * @param name the compound prefix of the properties to clear + * @param context the context used to resolve where the values should be cleared, or null if none + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void clearAll(String name, Object value, Map<String, String> context) { + set(new CompoundName(name), value, context); + } + + /** + * Sets all properties having this name as a compound prefix to null. + * I.e clearAll("a") will clear the value of "a" and "a.b" but not "ab". + * + * @param name the compound prefix of the properties to clear + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void clearAll(CompoundName name) { + clearAll(name, null); + } + + /** + * Sets all properties having this name as a compound prefix to null. + * I.e clearAll("a") will clear the value of "a" and "a.b" but not "ab". + * + * @param name the compound prefix of the properties to clear + * @throws RuntimeException if no instance in the chain accepted this name-value pair + */ + public final void clearAll(String name) { + clearAll(new CompoundName(name), Collections.<String,String>emptyMap()); + } + + /** + * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return + * the value. Returns false if this property is null. + */ + public final boolean getBoolean(CompoundName name) { + return getBoolean(name, false); + } + + /** + * Gets a property as a boolean - if this value can reasonably be interpreted as a boolean, this will return + * the value. Returns false if this property is null. + */ + public final boolean getBoolean(String name) { + return getBoolean(name, false); + } + + /** + * Gets a property as a boolean. + * This will return true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + * + * @param defaultValue the value to return if this property is null + */ + public final boolean getBoolean(CompoundName key, boolean defaultValue) { + return asBoolean(get(key), defaultValue); + } + + /** + * Gets a property as a boolean. + * This will return true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + * + * @param defaultValue the value to return if this property is null + */ + public final boolean getBoolean(String key, boolean defaultValue) { + return asBoolean(get(key), defaultValue); + } + + /** + * Converts a value to boolean - this will be true only if the value is either the empty string, + * or any Object which has a toString which is case-insensitive equal to "true" + */ + protected final boolean asBoolean(Object value, boolean defaultValue) { + if (value == null) return defaultValue; + + String s = value.toString(); + int sz = s.length(); + switch (sz) { + case 0: + return true; + case 4: + return ((s.charAt(0) | 0x20) == 't') && + ((s.charAt(1) | 0x20) == 'r') && + ((s.charAt(2) | 0x20) == 'u') && + ((s.charAt(3) | 0x20) == 'e'); + } + return false; + } + + /** + * Returns this property as a string + * + * @return this property as a string, or null if the property is null + */ + public final String getString(CompoundName key) { + return getString(key, null); + } + + /** + * Returns this property as a string + * + * @return this property as a string, or null if the property is null + */ + public final String getString(String key) { + return getString(key, null); + } + + /** + * Returns this property as a string + * + * @param key the property key + * @param defaultValue the value to return if this property is null + * @return this property as a string + */ + public final String getString(CompoundName key, String defaultValue) { + return asString(get(key), defaultValue); + } + + /** + * Returns this property as a string + * + * @param key the property key + * @param defaultValue the value to return if this property is null + * @return this property as a string + */ + public final String getString(String key, String defaultValue) { + return asString(get(key), defaultValue); + } + + protected final String asString(Object value, String defaultValue) { + if (value == null) return defaultValue; + return value.toString(); + } + + /** + * Returns a property as an Integer + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but + * have a toString which is not parseable as a number + */ + public final Integer getInteger(CompoundName name) { + return getInteger(name, null); + } + + /** + * Returns a property as an Integer + * + * @return the integer value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but + * have a toString which is not parseable as a number + */ + public final Integer getInteger(String name) { + return getInteger(name, null); + } + + /** + * Returns a property as an Integer + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for the name + * @throws NumberFormatException if the given parameter does not exist + * or does not have a toString parseable as a number + */ + public final Integer getInteger(CompoundName name, Integer defaultValue) { + return asInteger(get(name), defaultValue); + } + + /** + * Returns a property as an Integer + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for the name + * @throws NumberFormatException if the given parameter does not exist + * or does not have a toString parseable as a number + */ + public final Integer getInteger(String name, Integer defaultValue) { + return asInteger(get(name), defaultValue); + } + + protected final Integer asInteger(Object value, Integer defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Number) + return ((Number)value).intValue(); + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return Integer.valueOf(stringValue); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("'" + value + "' is not a valid integer"); + } + } + + /** + * Returns a property as a Long + * + * @return the long value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(CompoundName name) { + return getLong(name, null); + } + + /** + * Returns a property as a Long + * + * @return the long value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(String name) { + return getLong(name, null); + } + + /** + * Returns a property as a Long + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(CompoundName name, Long defaultValue) { + return asLong(get(name), defaultValue); + } + + /** + * Returns a property as a Long + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Long getLong(String name, Long defaultValue) { + return asLong(get(name), defaultValue); + } + + protected final Long asLong(Object value, Long defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Long) + return (Long) value; + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return Long.valueOf(value.toString()); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("Not a valid long"); + } + } + + /** + * Returns a property as a Double + * + * @return the double value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(CompoundName name) { + return getDouble(name, null); + } + + /** + * Returns a property as a Double + * + * @return the double value of the name, or null if the property is null + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(String name) { + return getDouble(name, null); + } + + /** + * Returns a property as a Double + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(CompoundName name, Double defaultValue) { + return asDouble(get(name), defaultValue); + } + + /** + * Returns a property as a Double + * + * @param name the property name + * @param defaultValue the value to return if this property is null + * @return the integer value for this name + * @throws NumberFormatException if the given parameter exists but have a value which + * is not parseable as a number + */ + public final Double getDouble(String name, Double defaultValue) { + return asDouble(get(name), defaultValue); + } + + protected final Double asDouble(Object value, Double defaultValue) { + try { + if (value == null) + return defaultValue; + + if (value instanceof Double) + return (Double) value; + + String stringValue = value.toString(); + if (stringValue.isEmpty()) + return defaultValue; + + return Double.valueOf(value.toString()); + } catch (IllegalArgumentException e) { + throw new NumberFormatException("Not a valid double"); + } + } + + /** + * Clones this instance and recursively all chained instance. + * Implementations should call this and clone their own state as appropriate + */ + public Properties clone() { + try { + Properties clone = (Properties) super.clone(); + if (chained != null) + clone.chained = this.chained.clone(); + return clone; + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Will never happen"); + } + } + + /** Clones a map by deep cloning each value which is cloneable and shallow copying all other values. */ + public static Map<CompoundName, Object> cloneMap(Map<CompoundName, Object> map) { + return cloneHelper.cloneMap(map); + } + + /** Clones this object if it is clonable, and the clone is public. Returns null if not */ + public static Object clone(Object object) { + return cloneHelper.clone(object); + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/package-info.java b/container-core/src/main/java/com/yahoo/processing/request/package-info.java new file mode 100644 index 00000000000..96e82294075 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.request; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java b/container-core/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java new file mode 100644 index 00000000000..54e5aae42cc --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/properties/PropertyMap.java @@ -0,0 +1,74 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.properties; + +import com.yahoo.processing.request.CompoundName; +import com.yahoo.processing.request.Properties; + +import java.util.HashMap; +import java.util.Map; + +/** + * A HashMap backing of Properties. + * <p> + * When this is cloned it will deep copy not only the model object map, but also each + * clonable member inside the map. + * <p> + * Subclassing is supported, a hook can be implemented to provide conditional inclusion in the map. + * By default - all properties are accepted, so set is never propagated. + * <p> + * This class is not multithread safe. + * + * @author bratseth + */ +public class PropertyMap extends Properties { + + /** + * The properties of this + */ + private Map<CompoundName, Object> properties = new HashMap<>(); + + public void set(CompoundName name, Object value, Map<String, String> context) { + if (shouldSet(name, value)) + properties.put(name, value); + else + super.set(name, value, context); + } + + /** + * Return true if this value should be set in this map, false if the set should be propagated instead + * This default implementation always returns true. + */ + protected boolean shouldSet(CompoundName name, Object value) { + return true; + } + + public + @Override + Object get(CompoundName name, Map<String, String> context, + com.yahoo.processing.request.Properties substitution) { + if (!properties.containsKey(name)) return super.get(name, context, substitution); + return properties.get(name); + } + + public + @Override + PropertyMap clone() { + PropertyMap clone = (PropertyMap) super.clone(); + clone.properties = cloneMap(this.properties); + return clone; + } + + @Override + public Map<String, Object> listProperties(CompoundName path, Map<String, String> context, Properties substitution) { + Map<String, Object> map = super.listProperties(path, context, substitution); + + for (Map.Entry<CompoundName, Object> entry : properties.entrySet()) { + if ( ! entry.getKey().hasPrefix(path)) continue; + CompoundName propertyName = entry.getKey().rest(path.size()); + if (propertyName.isEmpty()) continue; + map.put(propertyName.toString(), entry.getValue()); + } + return map; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java b/container-core/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java new file mode 100644 index 00000000000..785c3f08fa8 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/properties/PublicCloneable.java @@ -0,0 +1,15 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.request.properties; + +/** + * This interface publicly exposes the clone method. + * Classes which are used in request properties may implement this to allow faster cloning of the request. + * + * @author bratseth + * @since 5.66 + */ +public interface PublicCloneable<T> extends Cloneable { + + public T clone(); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/request/properties/package-info.java b/container-core/src/main/java/com/yahoo/processing/request/properties/package-info.java new file mode 100644 index 00000000000..bc0feb08411 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/request/properties/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.request.properties; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractData.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractData.java new file mode 100644 index 00000000000..341c6c800a5 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractData.java @@ -0,0 +1,30 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.processing.Request; + +/** + * Convenience superclass for implementations of data. This contains no payload. + * + * @author bratseth + */ +public abstract class AbstractData extends ListenableFreezableClass implements Data { + + private Request request; + + /** + * Creates some data marked with the request that created it + */ + public AbstractData(Request request) { + this.request = request; + } + + /** + * Returns the request that created this data + */ + public Request request() { + return request; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java new file mode 100644 index 00000000000..150d1e25d0a --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/AbstractDataList.java @@ -0,0 +1,161 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.yahoo.component.provider.ListenableFreezableClass; +import com.yahoo.processing.Request; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A convenience superclass for dataList implementations which handles references to the request and to incoming data. + * + * @author bratseth + */ +public abstract class AbstractDataList<DATATYPE extends Data> extends ListenableFreezableClass implements DataList<DATATYPE>, Streamed, Ordered { + + private final boolean ordered; + private final boolean streamed; + + /** + * The request which caused this to be created + */ + private final Request request; + + /** + * The recipient of incoming data to this. Never null, but may be a null recipient. + */ + private final IncomingData<DATATYPE> incomingData; + + private final ListenableFuture<DataList<DATATYPE>> completedFuture; + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + protected AbstractDataList(Request request) { + // Cannot call the constructor below because this must be given out below + this.request = request; + this.incomingData = new IncomingData.NullIncomingData<>(this); + this.completedFuture = new DrainOnGetFuture<>(this); + ordered = true; + streamed = true; + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData) { + this(request, incomingData, true, true); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected AbstractDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) { + if (request == null) throw new NullPointerException("Request cannot be null"); + if (incomingData == null) throw new NullPointerException("incomingData cannot be null"); + + this.request = request; + this.incomingData = incomingData; + this.completedFuture = new DrainOnGetFuture<>(this); + this.ordered = ordered; + this.streamed = streamed; + } + + /** + * Returns the request which created this data + */ + public Request request() { + return request; + } + + /** + * Returns the holder of incoming data to this. + * This may be used to add, consume, wait for and be notified about incoming data. + * If this instance does not support late incoming data, the read methods of the return object behaves + * as expected and is synchronization free. The write methods throws an exception. + */ + public IncomingData<DATATYPE> incoming() { + return incomingData; + } + + public ListenableFuture<DataList<DATATYPE>> complete() { + return completedFuture; + } + + @Override + public boolean isOrdered() { return ordered; } + + @Override + public boolean isStreamed() { return streamed; } + + public String toString() { + return super.toString() + (complete().isDone() ? " [completed]" : " [incomplete, " + incoming() + "]"); + } + + public static final class DrainOnGetFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + + private final DataList<DATATYPE> owner; + + public DrainOnGetFuture(DataList<DATATYPE> owner) { + this.owner = owner; + } + + /** + * Returns false as this is not cancellable + */ + @Override + public boolean cancel(boolean b) { + return false; + } + + /** + * Returns false as this is not cancellable + */ + @Override + public boolean isCancelled() { + return false; + } + + /** + * Wait until all data is available. When this returns all data is available in the returned data list. + */ + @Override + public DataList<DATATYPE> get() throws InterruptedException, ExecutionException { + return drain(owner.incoming().completed().get()); + } + + /** + * Wait until all data is available. + * When and if this returns normally all data is available in the returned data list + */ + @Override + public DataList<DATATYPE> get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + return drain(owner.incoming().completed().get(timeout, timeUnit)); + } + + private DataList<DATATYPE> drain(DataList<DATATYPE> dataList) { + for (DATATYPE item : dataList.incoming().drain()) + dataList.add(item); + set(dataList); // Signal completion to listeners + return dataList; + } + + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/ArrayDataList.java b/container-core/src/main/java/com/yahoo/processing/response/ArrayDataList.java new file mode 100644 index 00000000000..8987b8998af --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/ArrayDataList.java @@ -0,0 +1,130 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.collections.FreezableArrayList; +import com.yahoo.processing.Request; + +import java.util.List; + +/** + * A data list backed by an array. + * This implementation supports subclassing. + * + * @author bratseth + */ +public class ArrayDataList<DATATYPE extends Data> extends AbstractDataList<DATATYPE> { + + private final FreezableArrayList<DATATYPE> dataList = new FreezableArrayList<>(true); + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + protected ArrayDataList(Request request) { + super(request); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData) { + this(request, incomingData, true, true); + } + + /** + * Creates a simple data list which receives incoming data in the given instance + * + * @param request the request which created this data list, never null + * @param incomingData the recipient of incoming data to this list, never null + */ + protected ArrayDataList(Request request, IncomingData<DATATYPE> incomingData, boolean ordered, boolean streamed) { + super(request, incomingData, ordered, streamed); + } + + /** + * Creates a simple data list which does not allow late incoming data + * + * @param request the request which created this data list + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> create(Request request) { + return new ArrayDataList<>(request); + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData) + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsync(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData); + incomingData.assignOwner(dataList); + return dataList; + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData), + * and where this data can be rendered in any order. + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncUnordered(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, false, true); + incomingData.assignOwner(dataList); + return dataList; + } + + /** + * Creates an instance of this which supports incoming data through the default mechanism (DefaultIncomingData) + * and where this data cannot be returned to clients until this is completed. + */ + public static <DATATYPE extends Data> ArrayDataList<DATATYPE> createAsyncNonstreamed(Request request) { + DefaultIncomingData<DATATYPE> incomingData = new DefaultIncomingData<>(); + ArrayDataList<DATATYPE> dataList = new ArrayDataList<>(request, incomingData, true, false); + incomingData.assignOwner(dataList); + return dataList; + } + + public DATATYPE add(DATATYPE data) { + dataList.add(data); + return data; + } + + /** + * Returns the data element at index + */ + public DATATYPE get(int index) { + return dataList.get(index); + } + + /** + * Returns a reference to the list backing this. The list may be modified freely, + * unless this is frozen. If frozen, the only permissible write operations are those that + * add new items to the end of the list. + */ + public List<DATATYPE> asList() { + return dataList; + } + + @Override + public void addDataListener(Runnable runnable) { + dataList.addListener(runnable); + } + + /** + * Irreversibly prevent further changes to the items of this. + * This allows the processing engine to start streaming the current content of this list back to the + * client (if applicable). + * <p> + * Adding new items to the end of this list is permitted even after freeze. + * If frozen, those items may be streamed back to the client immediately on add. + * <p> + * Calling this on a frozen list has no effect. + */ + public void freeze() { + super.freeze(); + dataList.freeze(); + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/Data.java b/container-core/src/main/java/com/yahoo/processing/response/Data.java new file mode 100644 index 00000000000..ff48f1e86b6 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/Data.java @@ -0,0 +1,20 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.yahoo.component.provider.ListenableFreezable; +import com.yahoo.processing.Request; + +/** + * A data item created due to a processing request. + * <p> + * If a data item is <i>frozen</i> it is illegal to make further changes to its payload or referenced request. + * + * @author bratseth + */ +// TODO: Have DataList implement this instead, probably (should be a safe change in practise) +public interface Data extends ListenableFreezable { + + /** Returns the request that created this data */ + Request request(); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/DataList.java b/container-core/src/main/java/com/yahoo/processing/response/DataList.java new file mode 100644 index 00000000000..ff67dd82aa7 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/DataList.java @@ -0,0 +1,94 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; +import java.util.concurrent.Executor; + +/** + * A list of data items created due to a processing request. + * This list is itself a data item, allowing data items to be organized into a composite tree. + * <p> + * A data list can be frozen even though its child data items are not. + * When a datalist is frozen the only permissible write operation is to add new items + * to the end of the list. + * <p> + * Content in a frozen list may be returned to the requesting client immediately by the underlying engine, + * even if the Response owning the list is not returned yet. + * + * @author bratseth + */ +public interface DataList<DATATYPE extends Data> extends Data { + + /** + * Adds a child data item to this. + * + * @param data the data to add to this + * @return the input data instance, for chaining + */ + DATATYPE add(DATATYPE data); + + DATATYPE get(int index); + + /** + * Returns the content of this as a List. + * The returned list is either a read-only snapshot or an editable reference to the content of this. + * If the returned list is editable and this is frozen, the only allowed operation is to add new items + * to the end of the list. + */ + List<DATATYPE> asList(); + + /** + * Returns the buffer of incoming/future data to this list. + * This can be used to provide data to this list from other threads, after its creation, + * and to consume, wait for, or be notified upon the arrival of such data. + * <p> + * Some list instances do not support late incoming data, + * such lists responds to <i>read</i> calls to IncomingData as expected and without + * incurring any synchronization, and throws an exception on <i>write</i> calls. + */ + IncomingData<DATATYPE> incoming(); + + /** + * Returns a future in which all incoming data in this has become available. + * This has two uses: + * <ul> + * <li>Calling {@link #get} on this future will block (if necessary) until all incoming data has arrived, + * transfer that data from the incoming buffer into this list and invoke any listeners on this event + * on the calling thread. + * <li>Adding a listener on this future will cause it to be called when completion().get() is called, <i>after</i> + * the incoming data has been moved to this thread and <i>before</i> the get() call returns. + * </ul> + * <p> + * Note that if no thread calls completed().get(), this future will never occur. + * <p> + * Any data list consumer who wishes to make sure it sees the complete data for this list + * <i>must</i> call <code>dataList.complete().get()</code> before consuming this list. + * If a guaranteed non-blocking call to this method is desired, register a listener on the future where all + * data is available for draining (that is, on <code>dataList.incoming().completed()</code>) + * and resume by calling this method from the listener. + * <p> + * Making this call on a list which does not support future data always returns immediately and + * causes no memory synchronization cost. + */ + ListenableFuture<DataList<DATATYPE>> complete(); + + /** + * Adds a listener which is invoked every time data is added to this list. + * The listener is always invoked on the same thread which is adding the data, + * and hence it can modify this list freely without synchronization. + */ + void addDataListener(Runnable runnable); + + /** + * Notify this list that is will never be accessed again, neither for read nor write. + * Implementations can override this as an optimization to release any data held in the list + * for garbage collection. + * + * This default implementation does nothing. + */ + default void close() {}; + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java new file mode 100644 index 00000000000..c436f92f78b --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/DefaultIncomingData.java @@ -0,0 +1,131 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.collections.Tuple2; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; + +/** + * The default incoming data implementation + * + * @author bratseth + */ +public class DefaultIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> { + + private DataList<DATATYPE> owner = null; + + private final SettableFuture<DataList<DATATYPE>> completionFuture; + + private final List<DATATYPE> dataList = new ArrayList<>(); + + private List<Tuple2<Runnable,Executor>> newDataListeners = null; + + /** Whether this is completed, such that no more data can be added */ + private boolean complete = false; + + /** Creates an instance which must be assigned an owner after creation */ + public DefaultIncomingData() { + this(null); + } + + public DefaultIncomingData(DataList<DATATYPE> owner) { + assignOwner(owner); + completionFuture = SettableFuture.create(); + } + + /** Assigns the owner of this. Throws an exception if the owner is already set. */ + public final void assignOwner(DataList<DATATYPE> owner) { + if (this.owner != null) throw new NullPointerException("Owner of " + this + " was already assigned"); + this.owner = owner; + } + + @Override + public DataList<DATATYPE> getOwner() { + return owner; + } + + @Override + public ListenableFuture<DataList<DATATYPE>> completed() { + return completionFuture; + } + + /** Returns whether the data in this is complete */ + @Override + public synchronized boolean isComplete() { + return complete; + } + + /** Adds new data and marks this as completed */ + @Override + public synchronized void addLast(DATATYPE data) { + addLast(Collections.singletonList(data)); + } + + /** Adds new data without completing this */ + @Override + public synchronized void add(DATATYPE data) { + add(Collections.singletonList(data)); + } + + /** Adds new data and marks this as completed */ + @Override + public synchronized void addLast(List<DATATYPE> data) { + add(data); + markComplete(); + } + + /** Adds new data without completing this */ + @Override + public synchronized void add(List<DATATYPE> data) { + if (complete) throw new IllegalStateException("Attempted to add data to completed " + this); + + dataList.addAll(data); + notifyDataListeners(); + } + + /** Marks this as completed and notify any listeners */ + @Override + public synchronized void markComplete() { + complete = true; + completionFuture.set(owner); + } + + /** + * Gets and removes all the data currently available in this. + * The returned list is a modifiable fresh instance owned by the caller. + */ + public synchronized List<DATATYPE> drain() { + List<DATATYPE> dataListCopy = new ArrayList<>(dataList); + dataList.clear(); + return dataListCopy; + } + + @Override + public void addNewDataListener(Runnable listener, Executor executor) { + synchronized (this) { + if (newDataListeners == null) + newDataListeners = new ArrayList<>(); + newDataListeners.add(new Tuple2<>(listener, executor)); + if (dataList.isEmpty()) return; + } + notifyDataListeners(); + } + + private void notifyDataListeners() { + if (newDataListeners == null) return; + for (Tuple2<Runnable, Executor> listener : newDataListeners) { + listener.second.execute(listener.first); + } + } + + @Override + public String toString() { + return "incoming: " + (complete ? "complete" : "incomplete") + ", data " + dataList; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java new file mode 100644 index 00000000000..21877dfc8c3 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/FutureResponse.java @@ -0,0 +1,82 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.ForwardingFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.request.ErrorMessage; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A processing response which will arrive in the future. + * + * @author bratseth + */ +public class FutureResponse extends ForwardingFuture<Response> { + + private final Request request; + + /** + * Only used for generating messages + */ + private final Execution execution; + + private final static Logger log = Logger.getLogger(FutureResponse.class.getName()); + + private final ListenableFutureTask<Response> futureTask; + + public FutureResponse(final Callable<Response> callable, Execution execution, final Request request) { + this.futureTask = ListenableFutureTask.create(callable); + this.request = request; + this.execution = execution; + } + + @Override + public ListenableFutureTask<Response> delegate() { + return futureTask; + } + + public + @Override + Response get() { + try { + return super.get(); + } catch (InterruptedException e) { + return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e); + return new Response(request, new ErrorMessage("Error in '" + execution + "'", e)); + } + } + + public + @Override + Response get(long timeout, TimeUnit timeunit) { + try { + return super.get(timeout, timeunit); + } catch (InterruptedException e) { + return new Response(request, new ErrorMessage("'" + execution + "' was interrupted", e)); + } catch (ExecutionException e) { + log.log(Level.WARNING, "Exception on executing " + execution + " for " + request, e); + return new Response(request, new ErrorMessage("Error in '" + execution + "'", e)); + } catch (TimeoutException e) { + return new Response(request, new ErrorMessage("Error executing '" + execution + "': " + " Chain timed out.")); + } + } + + /** + * Returns the query used in this execution, never null + */ + public Request getRequest() { + return request; + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java new file mode 100644 index 00000000000..b8cdf8683bc --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/IncomingData.java @@ -0,0 +1,219 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +/** + * A data list own once instance of this which can be used to provide data asynchronously to the list, + * and consume, wait for or be notified upon the arrival of such data. + * + * @author bratseth + */ +public interface IncomingData<DATATYPE extends Data> { + + /** + * Returns the owner (target DataList) of this. + * Note that accessing the owner from the thread producing incoming data + * is generally *not* thread safe. + */ + DataList<DATATYPE> getOwner(); + + /** + * Returns a future in which all the incoming data that will be produced in this is available. + * Listeners on this are invoked on the thread producing the incoming data (or a thread spawned from it), + * which in general is separate from the thread using the data list. Hence, listeners on this even cannot + * in general assume that they may modify the data list or the request. + * <p> + * The data is not {@link #drain drained} into the owner of this by this method. That must be done + * by the thread using the data list. + * <p> + * This return the list owning this for convenience. + */ + ListenableFuture<DataList<DATATYPE>> completed(); + + /** + * Returns whether this is complete + */ + boolean isComplete(); + + /** + * Add new data and mark this as completed + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + void addLast(DATATYPE data); + + /** + * Add new data without completing this + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + void add(DATATYPE data); + + /** + * Add new data and mark this as completed + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + void addLast(List<DATATYPE> data); + + /** + * Add new data without completing this. + * + * @throws IllegalStateException if this is already complete or does not allow writes + */ + void add(List<DATATYPE> data); + + /** + * Mark this as completed and notify any listeners. If this is already complete this method does nothing. + */ + void markComplete(); + + /** + * Get and remove all the data currently available in this + */ + List<DATATYPE> drain(); + + /** + * Add a listener which will be invoked every time new data is added to this. + * This listener may be invoked at any time in any thread, any thread synchronization is left + * to the listener itself + */ + void addNewDataListener(Runnable listener, Executor executor); + + /** + * Creates a null implementation of this which is empty and complete at creation: + * <ul> + * <li>Provides immediate return without incurring any memory synchronization for + * any read method. + * <li>Throws an exception on any write method + * </ul> + * <p> + * This allows consumers to check for completion the same way whether or not the data list in question + * supports asynchronous addition of data, and without incurring unnecessary costs. + */ + final class NullIncomingData<DATATYPE extends Data> implements IncomingData<DATATYPE> { + + private DataList<DATATYPE> owner; + private final ImmediateFuture<DATATYPE> completionFuture; + + public NullIncomingData(DataList<DATATYPE> owner) { + this.owner = owner; + completionFuture = new ImmediateFuture<>(owner); + } + + public ListenableFuture<DataList<DATATYPE>> completed() { + return completionFuture; + } + + @Override + public DataList<DATATYPE> getOwner() { + return owner; + } + + /** + * Returns true + */ + @Override + public boolean isComplete() { + return true; + } + + /** + * @throws IllegalStateException as this is read only + */ + public void addLast(DATATYPE data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void add(DATATYPE data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void addLast(List<DATATYPE> data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * @throws IllegalStateException as this is read only + */ + public void add(List<DATATYPE> data) { + throw new IllegalStateException(owner + " does not support adding data asynchronously"); + } + + /** + * Do nothing as this is already complete + */ + public void markComplete() { + } + + public List<DATATYPE> drain() { + return Collections.emptyList(); + } + + /** + * Adds a new data listener to this - this is a no-op + * as new data can never be added to this implementation. + */ + public void addNewDataListener(Runnable listener, Executor executor) { } + + public String toString() { + return "(no incoming)"; + } + + /** + * A future which is always done and incurs no synchronization. + * This is semantically the same as Futures.immediateFuture but contrary to it, + * this never causes any memory synchronization when accessed. + */ + public static class ImmediateFuture<DATATYPE extends Data> extends AbstractFuture<DataList<DATATYPE>> { + + private DataList<DATATYPE> owner; + + public ImmediateFuture(DataList<DATATYPE> owner) { + this.owner = owner; // keep here to avoid memory synchronization for access + set(owner); // Signal completion (for future listeners) + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public DataList<DATATYPE> get() { + return owner; + } + + @Override + public DataList<DATATYPE> get(long l, TimeUnit timeUnit) { + return owner; + } + + } + + } + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/Ordered.java b/container-core/src/main/java/com/yahoo/processing/response/Ordered.java new file mode 100644 index 00000000000..dc969f7acef --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/Ordered.java @@ -0,0 +1,18 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +/** + * This is an <i>optional marker interface</i>. + * DataLists may implement this to return false to indicate that the order of the elements of + * the list is insignificant. The usage of this is to allow the content of a list to be rendered in the order + * in which it completes rather than in the order in which it is added to the list. + * + * @author bratseth + * @since 5.1.19 + */ +public interface Ordered { + + /** Returns false if the data in this list can be returned in any order. Default: true, meaning the order matters */ + public boolean isOrdered(); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/Streamed.java b/container-core/src/main/java/com/yahoo/processing/response/Streamed.java new file mode 100644 index 00000000000..2aae03104be --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/Streamed.java @@ -0,0 +1,21 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.response; + +/** + * This is an <i>optional marker interface</i>. + * DataLists may implement this to return false to indicate that no data from the list should be returned to clients + * until it is completed. This is useful in cases where some decision making which may impact the content of the list + * must be deferred until the list is complete. + * + * @author bratseth + * @since 5.1.19 + */ +public interface Streamed { + + /** + * Returns false if the data in this list can not be returned until it is completed. + * Default: true, meaning eager streaming of the data is permissible. + */ + public boolean isStreamed(); + +} diff --git a/container-core/src/main/java/com/yahoo/processing/response/package-info.java b/container-core/src/main/java/com/yahoo/processing/response/package-info.java new file mode 100644 index 00000000000..204b0e04393 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/response/package-info.java @@ -0,0 +1,6 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +@PublicApi package com.yahoo.processing.response; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; diff --git a/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java new file mode 100644 index 00000000000..5f6201c6f2d --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/test/ProcessorLibrary.java @@ -0,0 +1,556 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import com.yahoo.component.chain.Chain; +import com.yahoo.processing.Processor; +import com.yahoo.processing.Request; +import com.yahoo.processing.Response; +import com.yahoo.processing.execution.AsyncExecution; +import com.yahoo.processing.execution.Execution; +import com.yahoo.processing.execution.ExecutionWithResponse; +import com.yahoo.processing.execution.RunnableExecution; +import com.yahoo.processing.request.ErrorMessage; +import com.yahoo.processing.response.*; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * A collection of processors for test purposes. + * + * @author bratseth + */ +public class ProcessorLibrary { + + private ProcessorLibrary() { + } + + // ---------------------------------------- Data types + + public static class StringData extends AbstractData { + + private String string; + + public StringData(Request request, String string) { + super(request); + this.string = string; + } + + public void setString(String string) { + this.string = string; + } + + @Override + public String toString() { + return string; + } + + } + + public static class MapData extends AbstractData { + + private Map map = new LinkedHashMap(); + + public MapData(Request request) { + super(request); + } + + public Map map() { return map; } + + @Override + public String toString() { + return "map data: " + map; + } + + } + + // ---------------------------------------- DataLists + + public static class UnorderedArrayDataList extends ArrayDataList implements Ordered { + + public UnorderedArrayDataList(Request request) { + super(request); + } + + @Override + public boolean isOrdered() {return false; } + + } + + // ---------------------------------------- Processors + + /** + * Makes some modifications to the request, passes it on and finally removes one data item from the response + */ + public static class CombineData extends Processor { + + public Response process(Request request, Execution execution) { + request.properties().set("appendage", request.properties().getInteger("appendage") + 1); + Response response = execution.process(request); + + // Modify the response + StringData first = (StringData) response.data().get(0); + StringData third = (StringData) response.data().get(2); + first.setString(first.toString() + ", " + third.toString()); + response.data().asList().remove(2); + return response; + } + + } + + /** + * Sends the request multiple times to get at least 6 pieces of data + */ + public static class Get6DataItems extends Processor { + + @SuppressWarnings("unchecked") + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + while (response.data().asList().size() < 6) { + request.properties().set("appendage", request.properties().getInteger("appendage") + 1); + Response additional = execution.process(request); + response.mergeWith(additional); + response.data().asList().addAll(additional.data().asList()); + } + return response; + } + + } + + /** + * Produces 3 pieces of string data + */ + public static class DataSource extends Processor { + + @SuppressWarnings("unchecked") + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().add(new StringData(request, "first." + request.properties().get("appendage"))); + response.data().add(new StringData(request, "second." + request.properties().get("appendage"))); + response.data().add(new StringData(request, "third." + request.properties().get("appendage"))); + return response; + } + + } + + public static class Federator extends Processor { + + private final List<Chain<? extends Processor>> chains; + + private final boolean ordered; + + /** + * Federates over the given chains. Returns an ordered response. + */ + @SafeVarargs + public Federator(Chain<? extends Processor>... chains) { + this(true, chains); + } + + /** + * Federates over the given chains + * + * @param ordered true if the returned list should be ordered (default), false if it should be permissible + * to render the datalist from each federated source in the order it completes. + */ + @SafeVarargs + @SuppressWarnings("varargs") + public Federator(boolean ordered, Chain<? extends Processor>... chains) { + this.chains = Arrays.asList(chains); + this.ordered = ordered; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request)); + + List<FutureResponse> futureResponses = new ArrayList<>(chains.size()); + for (Chain<? extends Processor> chain : chains) { + + futureResponses.add(new AsyncExecution(chain, execution).process(request.clone())); + } + AsyncExecution.waitForAll(futureResponses, 1000); + for (FutureResponse futureResponse : futureResponses) { + Response federatedResponse = futureResponse.get(); + response.data().add(federatedResponse.data()); + response.mergeWith(federatedResponse); + } + return response; + } + } + + /** + * A federator which supports returning frozen data from each chain before the response is returned. + */ + public static class EagerReturnFederator extends Processor { + + private final List<Chain<? extends Processor>> chains; + + private final boolean ordered; + + /** + * Federates over the given chains. Returns an ordered response. + */ + @SafeVarargs + public EagerReturnFederator(Chain<? extends Processor>... chains) { + this(true, chains); + } + + /** + * Federates over the given chains + * + * @param ordered true if the returned list should be ordered (default), false if it should be permissible + * to render the datalist from each federated source in the order it completes. + */ + @SafeVarargs + @SuppressWarnings("varargs") + public EagerReturnFederator(boolean ordered, Chain<? extends Processor>... chains) { + this.chains = Arrays.asList(chains); + this.ordered = ordered; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + List<FutureResponse> futureResponses = new ArrayList<>(chains.size()); + for (Chain<? extends Processor> chain : chains) { + futureResponses.add(new AsyncExecution(chain, execution).process(request.clone())); + } + AsyncExecution.waitForAll(futureResponses, 1000); + Response response = ordered ? new Response(request) : new Response(new UnorderedArrayDataList(request)); + for (FutureResponse futureResponse : futureResponses) { + Response federatedResponse = futureResponse.get(); + response.data().add(federatedResponse.data()); + response.mergeWith(federatedResponse); + } + return response; + } + } + + /** + * Adds a data element containing the (recursive) count of concrete (non-list) data elements in the response + */ + public static class DataCounter extends Processor { + + private String prefix = ""; + + public DataCounter() { + } + + /** + * The prefix "[name] " is prepended to the string data + */ + public DataCounter(String name) { + prefix = "[" + name + "] "; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + int dataCount = countData(response.data()); + response.data().add(new StringData(request, prefix + "Data count: " + dataCount)); + return response; + } + + private int countData(DataList<? extends Data> dataList) { + int count = 0; + for (Data data : dataList.asList()) { + if (data instanceof DataList) + count += countData((DataList<?>) data); + else + count++; + } + return count; + } + } + + // TODO: Replace by below? + public static class FutureDataSource extends Processor { + + /** The list of incoming data this has created */ + public final List<IncomingData> incomingData = new ArrayList<>(); + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList = ArrayDataList.createAsync(request); + incomingData.add(dataList.incoming()); + return new Response(dataList); + } + + } + + /** Allows waiting for that request to happen. */ + public static class ListenableFutureDataSource extends Processor { + + private final boolean ordered, streamed; + + /** The incoming data this has created */ + public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + + /** Create an instance which returns ordered, streamable data */ + public ListenableFutureDataSource() { this(true, true); } + + public ListenableFutureDataSource(boolean ordered, boolean streamed) { + this.ordered = ordered; + this.streamed = streamed; + } + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList; + if (! ordered) + dataList = ArrayDataList.createAsyncUnordered(request); + else if (! streamed) + dataList = ArrayDataList.createAsyncNonstreamed(request); + else + dataList = ArrayDataList.createAsync(request); + incomingData.set(dataList.incoming()); + return new Response(dataList); + } + + } + + /** Allows waiting for that request to happen. */ + public static class RequestCounter extends Processor { + + /** The incoming data this has created */ + public final SettableFuture<IncomingData> incomingData = SettableFuture.create(); + + @Override + public Response process(Request request, Execution execution) { + ArrayDataList dataList = ArrayDataList.createAsync(request); + incomingData.set(dataList.incoming()); + return new Response(dataList); + } + + } + + /** + * Multiples the amount of data returned by parallelism by performing parallel executions of the rest of the chain + */ + public static class BlockingSplitter extends Processor { + + private final int parallelism; + + public BlockingSplitter(int parallelism) { + this.parallelism = parallelism; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + try { + // start executions in other threads + List<FutureResponse> futures = new ArrayList<>(parallelism - 1); + for (int i = 1; i < parallelism; i++) { + futures.add(new AsyncExecution(execution).process(request.clone())); + } + + // complete this execution + Response response = execution.process(request); + + // wait for other executions and merge the responses + for (Response additionalResponse : AsyncExecution.waitForAll(futures, 1000)) { + additionalResponse.data().complete().get(); // block until we have all the data elements + for (Object item : additionalResponse.data().asList()) + response.data().add((Data) item); + response.mergeWith(additionalResponse); + } + return response; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + } + + /** + * Registers an async processing of the chain given in the constructor on completion of the data in the response + */ + public static class AsyncDataProcessingInitiator extends Processor { + + private final Chain<Processor> asyncChain; + + public AsyncDataProcessingInitiator(Chain<Processor> asyncChain) { + this.asyncChain = asyncChain; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + // TODO: Consider for to best provide helpers for this + response.data().complete().addListener(new RunnableExecution(request, + new ExecutionWithResponse(asyncChain, response, execution)), + MoreExecutors.directExecutor()); + return response; + } + + } + + /** + * Registers a chain to be invoked each time new data becomes available in the first child list + */ + public static class StreamProcessingInitiator extends Processor { + + private final Chain<Processor> streamChain; + + public StreamProcessingInitiator(Chain<Processor> streamChain) { + this.streamChain = streamChain; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + // TODO: Consider for to best provide helpers for this + response.data().addDataListener(new RunnableExecution(request, + new ExecutionWithResponse(streamChain, response, execution))); + return response; + } + + } + + /** + * A processor which on invocation prints the string given on construction + */ + public static class Echo extends Processor { + + private String s; + + public Echo(String s) { + this.s = s; + } + + @Override + public Response process(Request request, Execution execution) { + System.out.println(s); + return execution.process(request); + } + + } + + /** + * A processor which adds a StringData item containing the string given in the constructor to every response + */ + public static class StringDataAdder extends Processor { + + private String string; + + public StringDataAdder(String string) { + this.string = string; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().add(new StringData(request, string)); + return response; + } + + } + + /** + * A processor which adds an ErrorMessage to the request of the top level + * data of each returned response. + */ + public static class ErrorAdder extends Processor { + + private ErrorMessage errorMessage; + + public ErrorAdder(ErrorMessage errorMessage) { + this.errorMessage = errorMessage; + } + + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + response.data().request().errors().add(errorMessage); + return response; + } + + } + + /** + * A processor which adds a List of StringData items containing the strings given in the constructor to every response + */ + public static class StringDataListAdder extends Processor { + + private String[] strings; + + public StringDataListAdder(String... strings) { + this.strings = strings; + } + + @SuppressWarnings("unchecked") + @Override + public Response process(Request request, Execution execution) { + Response response = execution.process(request); + DataList<StringData> list = ArrayDataList.create(request); + for (String string : strings) + list.add(new StringData(request, string)); + response.data().add(list); + return response; + } + + } + + /** + * Adds a the given trace message at the given trace level + */ + public static class Trace extends Processor { + + private String traceMessage; + private int traceLevel; + + public Trace(String traceMessage, int traceLevel) { + this.traceMessage = traceMessage; + this.traceLevel = traceLevel; + } + + @Override + public Response process(Request request, Execution execution) { + execution.trace().trace(traceMessage, traceLevel); + return execution.process(request); + } + + } + + public static final class StatusSetter extends Processor { + + private final int status; + + public StatusSetter(int status) { + this.status = status; + } + + @Override + public com.yahoo.processing.Response process(com.yahoo.processing.Request request, Execution execution) { + request.errors().add(new ErrorMessage(status, "")); + return execution.process(request); + } + + } + + /** + * Adds (key, value) to the log value trace. + */ + public static class LogValueAdder extends Processor { + private final String key; + private final String value; + + public LogValueAdder(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public Response process(Request request, Execution execution) { + execution.trace().logValue(key, value); + return execution.process(request); + } + } +} diff --git a/container-core/src/main/java/com/yahoo/processing/test/Responses.java b/container-core/src/main/java/com/yahoo/processing/test/Responses.java new file mode 100644 index 00000000000..cabce8fc109 --- /dev/null +++ b/container-core/src/main/java/com/yahoo/processing/test/Responses.java @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.processing.test; + +import com.yahoo.processing.response.Data; +import com.yahoo.processing.response.DataList; + +/** + * Static utilities + * + * @author bratseth + * @since 5.1.13 + */ +public class Responses { + + /** + * Returns a data item as a recursively indented string + */ + public static String recursiveToString(Data data) { + StringBuilder b = new StringBuilder(); + asString(data, b, ""); + return b.toString(); + } + + private static void asString(Data data, StringBuilder b, String indent) { + b.append(indent).append(data).append("\n"); + if (!(data instanceof DataList)) return; + for (Data childData : ((DataList<? extends Data>) data).asList()) { + asString(childData, b, indent.concat(" ")); + } + } + +} |