// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.processing.response; import com.yahoo.processing.impl.ProcessingFuture; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** * A data list own once instance of this which can be used to provide data asynchronously to the list, * and consume, wait for or be notified upon the arrival of such data. * * @author bratseth */ public interface IncomingData { /** * Returns the owner (target DataList) of this. * Note that accessing the owner from the thread producing incoming data * is generally *not* thread safe. */ DataList getOwner(); /** * Returns a future in which all the incoming data that will be produced in this is available. * Listeners on this are invoked on the thread producing the incoming data (or a thread spawned from it), * which in general is separate from the thread using the data list. Hence, listeners on this even cannot * in general assume that they may modify the data list or the request. *

* The data is not {@link #drain drained} into the owner of this by this method. That must be done * by the thread using the data list. *

* This return the list owning this for convenience. */ CompletableFuture> completedFuture(); /** * Returns whether this is complete */ boolean isComplete(); /** * Add new data and mark this as completed * * @throws IllegalStateException if this is already complete or does not allow writes */ void addLast(DATATYPE data); /** * Add new data without completing this * * @throws IllegalStateException if this is already complete or does not allow writes */ void add(DATATYPE data); /** * Add new data and mark this as completed * * @throws IllegalStateException if this is already complete or does not allow writes */ void addLast(List data); /** * Add new data without completing this. * * @throws IllegalStateException if this is already complete or does not allow writes */ void add(List data); /** * Mark this as completed and notify any listeners. If this is already complete this method does nothing. */ void markComplete(); /** * Get and remove all the data currently available in this */ List drain(); /** * Add a listener which will be invoked every time new data is added to this. * This listener may be invoked at any time in any thread, any thread synchronization is left * to the listener itself */ void addNewDataListener(Runnable listener, Executor executor); /** * Creates a null implementation of this which is empty and complete at creation: *

*

* This allows consumers to check for completion the same way whether or not the data list in question * supports asynchronous addition of data, and without incurring unnecessary costs. */ final class NullIncomingData implements IncomingData { private final DataList owner; private final ImmediateFuture completionFuture; public NullIncomingData(DataList owner) { this.owner = owner; completionFuture = new ImmediateFuture<>(owner); } @Override public CompletableFuture> completedFuture() { return completionFuture; } @Override public DataList getOwner() { return owner; } /** * Returns true */ @Override public boolean isComplete() { return true; } /** * @throws IllegalStateException as this is read only */ public void addLast(DATATYPE data) { throw new IllegalStateException(owner + " does not support adding data asynchronously"); } /** * @throws IllegalStateException as this is read only */ public void add(DATATYPE data) { throw new IllegalStateException(owner + " does not support adding data asynchronously"); } /** * @throws IllegalStateException as this is read only */ public void addLast(List data) { throw new IllegalStateException(owner + " does not support adding data asynchronously"); } /** * @throws IllegalStateException as this is read only */ public void add(List data) { throw new IllegalStateException(owner + " does not support adding data asynchronously"); } /** * Do nothing as this is already complete */ public void markComplete() { } public List drain() { return Collections.emptyList(); } /** * Adds a new data listener to this - this is a no-op * as new data can never be added to this implementation. */ public void addNewDataListener(Runnable listener, Executor executor) { } public String toString() { return "(no incoming)"; } /** * A future which is always done and incurs no synchronization. * This is semantically the same as Futures.immediateFuture but contrary to it, * this never causes any memory synchronization when accessed. */ public static class ImmediateFuture extends ProcessingFuture> { private final DataList owner; public ImmediateFuture(DataList owner) { this.owner = owner; // keep here to avoid memory synchronization for access complete(owner); // Signal completion (for future listeners) } @Override public boolean cancel(boolean b) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return true; } @Override public DataList get() { return owner; } @Override public DataList get(long l, TimeUnit timeUnit) { return owner; } } } }