diff options
Diffstat (limited to 'vespajlib/src/main/java/com/yahoo/concurrent')
11 files changed, 1073 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java b/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java new file mode 100644 index 00000000000..e15a3734094 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java @@ -0,0 +1,94 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * <p>This is a thread hash map for small collections that are stable once built. Until it is stable there will be a + * race among all threads missing something in the map. They will then clone the map add the missing stuff and then put + * it back as active again. Here are no locks, but the cost is that inserts will happen a lot more than necessary. The + * map reference is volatile, but on most multicpu machines that has no cost unless modified.</p> + * + * @author <a href="mailto:balder@yahoo-inc.com">Henning Baldersheim</a> + * @since 5.2 + */ +public class CopyOnWriteHashMap<K, V> implements Map<K, V> { + + private volatile HashMap<K, V> map = new HashMap<>(); + + @Override + public int size() { + return map.size(); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + @Override + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + @Override + public V get(Object key) { + return map.get(key); + } + + @Override + public V put(K key, V value) { + HashMap<K, V> next = new HashMap<>(map); + V old = next.put(key, value); + map = next; + return old; + } + + @Override + @SuppressWarnings("SuspiciousMethodCalls") + public V remove(Object key) { + HashMap<K, V> prev = map; + if (!prev.containsKey(key)) { + return null; + } + HashMap<K, V> next = new HashMap<>(prev); + V old = next.remove(key); + map = next; + return old; + } + + @Override + public void putAll(Map<? extends K, ? extends V> m) { + HashMap<K, V> next = new HashMap<>(map); + next.putAll(m); + map = next; + } + + @Override + public void clear() { + map = new HashMap<>(); + } + + @Override + public Set<K> keySet() { + return map.keySet(); + } + + @Override + public Collection<V> values() { + return map.values(); + } + + @Override + public Set<Entry<K, V>> entrySet() { + return map.entrySet(); + } +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java b/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java new file mode 100644 index 00000000000..38c5bafc0d6 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java @@ -0,0 +1,48 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * A simple thread factory that decorates <code>Executors.defaultThreadFactory()</code> + * and sets all created threads to be daemon threads. + * + * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a> + */ +public class DaemonThreadFactory implements ThreadFactory { + private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); + private String prefix = null; + + /** + * Creates a deamon thread factory that creates threads with the default names + * provided by <code>Executors.defaultThreadFactory()</code>. + */ + public DaemonThreadFactory() { + } + + /** + * Creates a deamon thread factory that creates threads with the default names + * provided by <code>Executors.defaultThreadFactory()</code> prepended by the + * specified prefix. + * + * @param prefix the thread name prefix to use + */ + public DaemonThreadFactory(String prefix) { + this.prefix = prefix; + } + + public String getPrefix() { + return prefix; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread t = defaultThreadFactory.newThread(runnable); + t.setDaemon(true); + if (prefix != null) { + t.setName(prefix + t.getName()); + } + return t; + } +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java b/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java new file mode 100644 index 00000000000..389fe8a85ea --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent;
+
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Reference implementation of the 'Incremental Minimal Event Barrier'
+ * algorithm. An event in this context is defined to be something that
+ * happens during a time interval. An event barrier is a time interval
+ * for which events may start before or end after, but not both. The
+ * problem solved by the algorithm is to determine the minimal event
+ * barrier starting at a given time. In other words; wait for the
+ * currently active events to complete. The most natural use of this
+ * algorithm would be to make a thread wait for events happening in
+ * other threads to complete.
+ *
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class EventBarrier {
+
+ private final List<Entry> queue = new LinkedList<>();
+ private int barrierToken = 0;
+ private int eventCount = 0;
+
+ /**
+ * At creation there are no active events and no pending barriers.
+ */
+ public EventBarrier() {
+ // empty
+ }
+
+ /**
+ * Obtain the current number of active events. This method is
+ * intended for testing and debugging.
+ *
+ * @return Number of active events.
+ */
+ int getNumEvents() {
+ int cnt = eventCount;
+ for (Entry entry : queue) {
+ cnt += entry.eventCount;
+ }
+ return cnt;
+ }
+
+ /**
+ * Obtain the current number of pending barriers. This method is
+ * intended for testing and debugging.
+ *
+ * @return Number of pending barriers.
+ */
+ int getNumBarriers() {
+ return queue.size();
+ }
+
+ /**
+ * Signal the start of an event. The value returned from this
+ * method must later be passed to the completeEvent method when
+ * signaling the completion of the event.
+ *
+ * @return Opaque token identifying the started event.
+ */
+ public int startEvent() {
+ ++eventCount;
+ return barrierToken;
+ }
+
+ /**
+ * Signal the completion of an event. The value passed to this
+ * method must be the same as the return value previously obtained
+ * from the startEvent method. This method will signal the
+ * completion of all pending barriers that were completed by the
+ * completion of this event.
+ *
+ * @param token Opaque token identifying the completed event.
+ */
+ public void completeEvent(int token) {
+ if (token == this.barrierToken) {
+ --eventCount;
+ return;
+ }
+ --queue.get(queue.size() - (this.barrierToken - token)).eventCount;
+ while (!queue.isEmpty() && queue.get(0).eventCount == 0) {
+ queue.remove(0).handler.completeBarrier();
+ }
+ }
+
+ /**
+ * Initiate the detection of the minimal event barrier starting
+ * now. If this method returns false it means that no events were
+ * currently active and the minimal event barrier was infinitely
+ * small. If this method returns false the handler will not be
+ * notified of the completion of the barrier. If this method
+ * returns true it means that the started barrier is pending and
+ * that the handler passed to this method will be notified of its
+ * completion at a later time.
+ *
+ * @param handler Handler notified of the completion of the barrier.
+ * @return True if a barrier was started, false if no events were active.
+ */
+ public boolean startBarrier(BarrierWaiter handler) {
+ if (eventCount == 0 && queue.isEmpty()) {
+ return false;
+ }
+ queue.add(new Entry(eventCount, handler));
+ ++barrierToken;
+ eventCount = 0;
+ return true;
+ }
+
+ /**
+ * Declares the interface required to wait for the detection of a
+ * minimal event barrier. An object that implements this is passed
+ * to the {@link EventBarrier#startBarrier(BarrierWaiter)}.
+ */
+ public interface BarrierWaiter {
+
+ /**
+ * Callback invoked by the thread that detected the minimal
+ * event barrier. Once this is called, all events taking place
+ * at or before the corresponding call to {@link
+ * EventBarrier#startBarrier(BarrierWaiter)} have ended.
+ */
+ public void completeBarrier();
+ }
+
+ private static class Entry {
+
+ int eventCount;
+ final BarrierWaiter handler;
+
+ Entry(int eventCount, BarrierWaiter handler) {
+ this.eventCount = eventCount;
+ this.handler = handler;
+ }
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java b/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java new file mode 100644 index 00000000000..c2d19831810 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import com.yahoo.concurrent.ThreadLocalDirectory.ObservableUpdater; +import com.yahoo.concurrent.ThreadLocalDirectory.Updater; + +/** + * Only for use along with ThreadLocalDirectory. A thread local data container + * instance. The class is visible to avoid indirection through the internal + * {@link ThreadLocal} in ThreadLocalDirectory if possible, but has no user + * available methods. + * + * @param AGGREGATOR + * the structure to insert produced data into + * @param SAMPLE + * type of produced data to insert from each participating thread + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public final class LocalInstance<AGGREGATOR, SAMPLE> { + /** + * The current generation of data produced from a single thread, where + * generation is the period between two subsequent calls to + * ThreadLocalDirectory.fetch(). + */ + private AGGREGATOR current; + + // see comment on setRegistered(boolean) for locking explanation + private boolean isRegistered = false; + private final Object lock = new Object(); + + LocalInstance(Updater<AGGREGATOR, SAMPLE> updater) { + current = updater.createGenerationInstance(null); + } + + boolean update(SAMPLE x, Updater<AGGREGATOR, SAMPLE> updater) { + synchronized (lock) { + current = updater.update(current, x); + return isRegistered; + } + } + + AGGREGATOR getAndReset(Updater<AGGREGATOR, SAMPLE> updater) { + AGGREGATOR previous; + synchronized (lock) { + previous = current; + current = updater.createGenerationInstance(previous); + setRegistered(false); + } + return previous; + } + + AGGREGATOR copyCurrent(ObservableUpdater<AGGREGATOR, SAMPLE> updater) { + AGGREGATOR view; + synchronized (lock) { + view = updater.copy(current); + } + return view; + } + + // This is either set by the putting thread or the fetching thread. If + // it is set by the putting thread, then there is no memory barrier, + // because it is only _read_ in the putting thread. If it is set by the + // fetching thread, then the memory barrier is this.lock. This + // roundabout way is to avoid creating many-to-many memory barrier and + // locking relationships. + void setRegistered(boolean isRegistered) { + this.isRegistered = isRegistered; + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java new file mode 100644 index 00000000000..339d8002c4f --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java @@ -0,0 +1,86 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import com.yahoo.collections.Tuple2; + +/** + * A class for sending single messages between threads with timeout. Typical use + * would be + * + * <pre> + * Receiver<SomeMessage> receiver = new Receiver<SomeMessage>(); + * SomeRunnable runnable = new SomeRunnable(receiver); + * Thread worker = new Thread(runnable); + * worker.start(); + * Pair<Receiver.MessageState, SomeMessage> answer = receiver.get(500L); + * </pre> + * + * ... and in the worker thread simply + * + * <pre> + * receiver.put(new SomeMessage(...)) + * </pre> + * + * <p> + * Any number of threads may wait for the same message. Sending null references + * is supported. The object is intended for delivering only single message, + * there is no support for recycling it. + * </p> + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class Receiver<T> { + /** + * MessageState is the reason for returning from get(). If a message is + * received before timeout, the state will be VALID. If no message is + * received before timeout, state is TIMEOUT. + */ + public enum MessageState { + VALID, TIMEOUT; + }; + private final Object lock = new Object(); + private T message = null; + private boolean received = false; + + /** + * Make a message available for consumers. + * + * @param message the message to send + * @throws IllegalStateException if a message has already been received here + */ + public void put(T message) { + synchronized (lock) { + if (received) { + throw new IllegalStateException("Multiple puts on a single Receiver instance is not legal."); + } + this.message = message; + received = true; + lock.notifyAll(); + } + } + + /** + * Wait for up to "timeout" milliseconds for an incoming message. This hides + * spurious wakeup, but InterruptedException will be propagated. + * + * @param timeout + * maximum time to wait for message in milliseconds + * @return a Pair instance containing the reason for returning and the + * message possible received + * @throws InterruptedException if the waiting thread is interrupted + */ + public Tuple2<MessageState, T> get(long timeout) throws InterruptedException { + long barrier = System.currentTimeMillis() + timeout; + synchronized (lock) { + while (!received) { + long t = System.currentTimeMillis(); + if (t >= barrier) { + return new Tuple2<>(MessageState.TIMEOUT, null); + } + lock.wait(barrier - t); + } + return new Tuple2<>(MessageState.VALID, message); + } + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java b/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java new file mode 100644 index 00000000000..5aa4990a86a --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java @@ -0,0 +1,41 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is an implementation of {@link Timer} that is backed by an actual system timer.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public enum SystemTimer implements Timer {
+
+ INSTANCE;
+
+ private volatile long millis;
+
+ private SystemTimer() {
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ while (true) {
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java new file mode 100644 index 00000000000..5be6da8c66d --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java @@ -0,0 +1,72 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Created with IntelliJ IDEA. + * User: balder + * Date: 24.04.13 + * Time: 19:00 + * To change this template use File | Settings | File Templates. + */ +public class ThreadFactoryFactory { + static public synchronized ThreadFactory getThreadFactory(String name) { + PooledFactory p = factory.get(name); + if (p == null) { + p = new PooledFactory(name); + factory.put(name, p); + } + return p.getFactory(false); + } + static public synchronized ThreadFactory getDaemonThreadFactory(String name) { + PooledFactory p = factory.get(name); + if (p == null) { + p = new PooledFactory(name); + factory.put(name, p); + } + return p.getFactory(true); + } + private static class PooledFactory { + private static class Factory implements ThreadFactory { + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + final boolean isDaemon; + + Factory(final String name, boolean isDaemon) { + this.isDaemon = isDaemon; + final SecurityManager s = System.getSecurityManager(); + group = (s != null) + ? s.getThreadGroup() + : Thread.currentThread().getThreadGroup(); + namePrefix = name; + } + + @Override + public Thread newThread(final Runnable r) { + final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon() != isDaemon) { + t.setDaemon(isDaemon); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + return t; + } + } + PooledFactory(String name) { + this.name = name; + } + ThreadFactory getFactory(boolean isDaemon) { + return new Factory(name + "-" + poolId.getAndIncrement() + "-thread-", isDaemon); + + } + private final String name; + private final AtomicInteger poolId = new AtomicInteger(1); + } + static private Map<String, PooledFactory> factory = new HashMap<>(); +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java new file mode 100644 index 00000000000..ef2273bdb25 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java @@ -0,0 +1,346 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.ArrayList; +import java.util.List; + +/** + * A class for multiple producers and potentially multiple consumers (usually + * only one). + * + * <p> + * The consuming threads always unregisters the data producers when doing + * fetch(). This is the reason for having to do update through the directory. + * The reason for this is otherwise, we would either get reference leaks from + * registered objects belonging to dead threads if we did not unregister + * instances, otherwise the sampling thread would have to unregister the + * instance, and then we would create a memory relationship between all + * producing threads, which is exactly what this class aims to avoid. + * </p> + * + * <p> + * A complete example from a test: + * </p> + * + * <pre> + * private static class SumUpdater implements ThreadLocalDirectory.Updater<Integer, Integer> { + * + * {@literal @}Override + * public Integer update(Integer current, Integer x) { + * return Integer.valueOf(current.intValue() + x.intValue()); + * } + * + * {@literal @}Override + * public Integer createGenerationInstance(Integer previous) { + * return Integer.valueOf(0); + * } + * } + * + * ... then the producers does (where r is in instance of + * ThreadLocalDirectory)... + * + * {@literal @}Override + * public void run() { + * LocalInstance<Integer, Integer> s = r.getLocalInstance(); + * for (int i = 0; i < 500; ++i) { + * r.update(Integer.valueOf(i), s); + * } + * } + * + * ... and the consumer... + * + * List<Integer> measurements = s.fetch() + * </pre> + * + * <p> + * Invoking r.fetch() will produce a list of integers from all the participating + * threads at any time. + * </p> + * + * <p> + * Refer to e.g. com.yahoo.search.statistics.PeakQpsSearcher for a production + * example. + * </p> + * + * @param AGGREGATOR + * the type input data is aggregated into + * @param SAMPLE + * the type of input data + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public final class ThreadLocalDirectory<AGGREGATOR, SAMPLE> { + /** + * Factory interface to create the data container for each generation of + * samples, and putting data into it. + * + * <p> + * The method for actual insertion of a single sample into the current data + * generation exists separate from LocalInstance.AGGREGATOR to make it + * possible to use e.g. Integer and List as AGGREGATOR types. + * </p> + * + * <p> + * The allocation and sampling is placed in the same class, since you always + * need to implement both. + * </p> + * + * @param AGGREGATOR + * The type of the data container to produce + * @param SAMPLE + * The type of the incoming data to store in the container. + */ + public interface Updater<AGGREGATOR, SAMPLE> { + /** + * Create data container to receive produced data. This is invoked once + * on every instance every time ThreadLocalDirectory.fetch() is invoked. + * This might be an empty list, creating a new counter set to zero, or + * even copying the current state of LocalInstance.current. + * LocalInstance.current will be set to the value received from this + * factory after invokation this method. + * + * <p> + * The first time this method is invoked for a thread, previous will be + * null. + * </p> + * + * <p> + * If using mutable objects, an implementation should always create a + * new instance in this method, as the previous data generation will be + * transmitted to the consuming thread. This obviously does not matter + * if using immutable (value) objects. + * </p> + * + * <p> + * Examples: + * </p> + * + * <p> + * Using a mutable aggregator (a list of integers): + * </p> + * + * <pre> + * if (previous == null) { + * return new ArrayList<Integer>(); + * } else { + * return new ArrayList<Integer>(previous.size()); + * } + * </pre> + * + * <p> + * Using an immutable aggregator (an integer): + * </p> + * + * <pre> + * return Integer.valueOf(0); + * </pre> + * + * @return a fresh structure to receive data + */ + public AGGREGATOR createGenerationInstance(AGGREGATOR previous); + + /** + * Insert a data element of type S into the current generation of data + * carrier T. This could be e.g. adding to a list, putting into a local + * histogram or increasing a counter. + * + * <p> + * The method may or may not return a fresh instance of the current + * value for each invokation, if using a mutable aggregator the typical + * case will be returning the same instance for the new and old value of + * current, while if using an immutable aggregator, one is forced to + * return new instances. + * </p> + * + * <p> + * Examples: + * </p> + * + * <p> + * Using a mutable aggregator (a list of instances of type SAMPLE): + * </p> + * + * <pre> + * current.add(x); + * return current; + * </pre> + * + * <p> + * Using an immutable aggregator (Integer) while also using Integer as + * type for SAMPLE: + * </p> + * + * <pre> + * return Integer.valueOf(current.intValue() + x.intValue()); + * </pre> + * + * @param current + * the current generation's data container + * @param x + * the data to insert + * @return the new current value, may be the same as previous + */ + public AGGREGATOR update(AGGREGATOR current, SAMPLE x); + } + + /** + * Implement this interface to be able to view the contents of a + * ThreadLocalDirectory without resetting the local instances in each + * thread. + * + * @param <AGGREGATOR> + * as for {@link Updater} + * @param <SAMPLE> + * as for {@link Updater} + * @see ThreadLocalDirectory#view() + */ + public interface ObservableUpdater<AGGREGATOR, SAMPLE> extends + Updater<AGGREGATOR, SAMPLE> { + /** + * Create an application specific copy of the AGGREGATOR for a thread. + * + * @param current + * the AGGREGATOR instance to copy + * @return a copy of the incoming parameter + */ + public AGGREGATOR copy(AGGREGATOR current); + } + + private final ThreadLocal<LocalInstance<AGGREGATOR, SAMPLE>> local = new ThreadLocal<>(); + private final Object directoryLock = new Object(); + private List<LocalInstance<AGGREGATOR, SAMPLE>> directory = new ArrayList<>(); + private final Updater<AGGREGATOR, SAMPLE> updater; + private final ObservableUpdater<AGGREGATOR, SAMPLE> observableUpdater; + + public ThreadLocalDirectory(Updater<AGGREGATOR, SAMPLE> updater) { + this.updater = updater; + if (updater instanceof ObservableUpdater) { + observableUpdater = (ObservableUpdater<AGGREGATOR, SAMPLE>) updater; + } else { + observableUpdater = null; + } + } + + private void put(LocalInstance<AGGREGATOR, SAMPLE> q) { + // Has to set registered before adding to the list. Otherwise, the + // instance might be removed from the list, set as unregistered, and + // then the local thread might happily remove that information. The Java + // memory model is a guarantuee for the minimum amount of visibility, + // not a definition of the actual amount. + q.setRegistered(true); + synchronized (directoryLock) { + directory.add(q); + } + } + + /** + * Fetch the current set of sampled data, and reset state of all thread + * local instances. The producer threads will not alter data in the list + * returned from this method. + * + * @return a list of data from all producer threads + */ + public List<AGGREGATOR> fetch() { + List<AGGREGATOR> contained; + List<LocalInstance<AGGREGATOR, SAMPLE>> previous; + int previousIntervalSize; + + synchronized (directoryLock) { + previousIntervalSize = directory.size(); + previous = directory; + directory = new ArrayList<>( + previousIntervalSize); + } + contained = new ArrayList<>(previousIntervalSize); + // Yes, this is an inconsistence about when the registered state is + // reset and when the thread local is removed from the list. + // LocalInstance.isRegistered tells whether the data is available to + // some consumer, not whether the LocalInstance is a member of the + // directory. + for (LocalInstance<AGGREGATOR, SAMPLE> x : previous) { + contained.add(x.getAndReset(updater)); + } + return contained; + } + + /** + * Get a view of the current data. This requires this ThreadLocalDirectory + * to have been instantiated with an updater implementing ObservableUpdater. + * + * @return a list of a copy of the current data in all producer threads + * @throws IllegalStateException + * if the updater does not implement {@link ObservableUpdater} + */ + public List<AGGREGATOR> view() { + if (observableUpdater == null) { + throw new IllegalStateException("Does not use observable updaters."); + } + List<LocalInstance<AGGREGATOR, SAMPLE>> current; + List<AGGREGATOR> view; + synchronized (directoryLock) { + current = new ArrayList<>( + directory); + } + view = new ArrayList<>(current.size()); + for (LocalInstance<AGGREGATOR, SAMPLE> x : current) { + view.add(x.copyCurrent(observableUpdater)); + } + return view; + } + + private LocalInstance<AGGREGATOR, SAMPLE> getOrCreateLocal() { + LocalInstance<AGGREGATOR, SAMPLE> current = local.get(); + if (current == null) { + current = new LocalInstance<>(updater); + local.set(current); + } + return current; + } + + /** + * Expose the thread local for the running thread, for use in conjunction + * with update(SAMPLE, LocalInstance<AGGREGATOR, SAMPLE>). + * + * @return the current thread's local instance + */ + public LocalInstance<AGGREGATOR, SAMPLE> getLocalInstance() { + return getOrCreateLocal(); + } + + /** + * Input data from a producer thread. + * + * @param x + * the data to insert + */ + public void update(SAMPLE x) { + update(x, getOrCreateLocal()); + } + + /** + * Update a value with a given thread local instance. + * + * <p> + * If a producer thread is to insert a series of data, it is desirable to + * limit the number of memory transactions to the theoretical minimum. Since + * reading a thread local is the memory equivalence of reading a volatile, + * it is then useful to avoid re-reading the running threads' input + * instance. For this scenario, fetch the running thread's instance with + * getLocalInstance(), and then insert the produced data with the multiple + * calls necessary to update(SAMPLE, LocalInstance<AGGREGATOR, SAMPLE>). + * </p> + * + * @param x + * the data to insert + * @param localInstance + * the local data insertion instance + */ + public void update(SAMPLE x, LocalInstance<AGGREGATOR, SAMPLE> localInstance) { + boolean isRegistered; + isRegistered = localInstance.update(x, updater); + if (!isRegistered) { + put(localInstance); + } + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java new file mode 100644 index 00000000000..8a79db6a6eb --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java @@ -0,0 +1,151 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * A list which tolerates concurrent adds from one other thread while it is + * read. More precisely: <i>This list is guaranteed to provide a self-consistent + * read view regardless of the internal order in which the primitive mutating + * operations on it are observed from the reading thread.</i> + * <p> + * This is useful for traced information as there may be timed out threads + * working on the structure after it is returned upwards for consumption. + * + * @since 4.2 + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ThreadRobustList<T> implements Iterable<T> { + + private Object[] items; + + /** Index of the next item */ + private int next = 0; + + public ThreadRobustList() { + this(10); + } + + public ThreadRobustList(final int initialCapacity) { + items = new Object[initialCapacity]; + } + + public void add(final T item) { + Object[] workItems = items; + if (next >= items.length) { + final int newLength = 20 + items.length * 2; + workItems = Arrays.copyOf(workItems, newLength); + workItems[next++] = item; + items = workItems; + } else { + workItems[next++] = item; + } + } + + /** + * Returns an iterator over the elements of this. This iterator does not + * support remove. + */ + @Override + public Iterator<T> iterator() { + return new ThreadRobustIterator(items); + } + + /** + * Returns an iterator over the elements of this, starting at the last + * element and working backwards. This iterator does not support remove. + */ + public Iterator<T> reverseIterator() { + return new ThreadRobustReverseIterator(items); + } + + public boolean isEmpty() { + return next == 0; + } + + private class ThreadRobustIterator implements Iterator<T> { + + private final Object[] items; + + private int nextIndex = 0; + + public ThreadRobustIterator(final Object[] items) { + this.items = items; + } + + public @Override + void remove() { + throw new UnsupportedOperationException( + "remove() is not supported on thread robust list iterators"); + } + + @SuppressWarnings("unchecked") + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements"); + } + + return (T) items[nextIndex++]; + } + + @Override + public boolean hasNext() { + if (nextIndex >= items.length) { + return false; + } + if (items[nextIndex] == null) { + return false; + } + return true; + } + + } + + private class ThreadRobustReverseIterator implements Iterator<T> { + + private final Object[] items; + + private int nextIndex; + + public ThreadRobustReverseIterator(final Object[] items) { + this.items = items; + nextIndex = findLastAssignedIndex(items); + } + + private int findLastAssignedIndex(final Object[] items) { + for (int i = items.length - 1; i >= 0; i--) { + if (items[i] != null) { + return i; + } + } + return -1; + } + + public @Override + void remove() { + throw new UnsupportedOperationException( + "remove() is not supported on thread robust list iterators"); + } + + @SuppressWarnings("unchecked") + @Override + public T next() { + if (!hasNext()) { + throw new NoSuchElementException("No more elements"); + } + + return (T) items[nextIndex--]; + } + + @Override + public boolean hasNext() { + return nextIndex >= 0; + } + + } + +} diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java new file mode 100644 index 00000000000..aefbfafb7b1 --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java @@ -0,0 +1,19 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.concurrent;
+
+/**
+ * This interface wraps access to some timer that can be used to measure elapsed time, in milliseconds. This
+ * abstraction allows for unit testing the behavior of time-based constructs.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface Timer {
+
+ /**
+ * Returns the current value of some arbitrary timer, in milliseconds. This method can only be used to measure
+ * elapsed time and is not related to any other notion of system or wall-clock time.
+ *
+ * @return The current value of the timer, in milliseconds.
+ */
+ public long milliTime();
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java b/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java new file mode 100644 index 00000000000..dd0d639166d --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java @@ -0,0 +1,5 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +@ExportPackage +package com.yahoo.concurrent; + +import com.yahoo.osgi.annotation.ExportPackage; |