aboutsummaryrefslogtreecommitdiffstats
path: root/vespajlib/src/main/java/com/yahoo/concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'vespajlib/src/main/java/com/yahoo/concurrent')
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java94
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java48
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java140
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java71
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java86
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java41
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java72
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java346
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java151
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/Timer.java19
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/package-info.java5
11 files changed, 1073 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java b/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java
new file mode 100644
index 00000000000..e15a3734094
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/CopyOnWriteHashMap.java
@@ -0,0 +1,94 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * <p>This is a thread hash map for small collections that are stable once built. Until it is stable there will be a
+ * race among all threads missing something in the map. They will then clone the map add the missing stuff and then put
+ * it back as active again. Here are no locks, but the cost is that inserts will happen a lot more than necessary. The
+ * map reference is volatile, but on most multicpu machines that has no cost unless modified.</p>
+ *
+ * @author <a href="mailto:balder@yahoo-inc.com">Henning Baldersheim</a>
+ * @since 5.2
+ */
+public class CopyOnWriteHashMap<K, V> implements Map<K, V> {
+
+ private volatile HashMap<K, V> map = new HashMap<>();
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return map.containsValue(value);
+ }
+
+ @Override
+ public V get(Object key) {
+ return map.get(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ HashMap<K, V> next = new HashMap<>(map);
+ V old = next.put(key, value);
+ map = next;
+ return old;
+ }
+
+ @Override
+ @SuppressWarnings("SuspiciousMethodCalls")
+ public V remove(Object key) {
+ HashMap<K, V> prev = map;
+ if (!prev.containsKey(key)) {
+ return null;
+ }
+ HashMap<K, V> next = new HashMap<>(prev);
+ V old = next.remove(key);
+ map = next;
+ return old;
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> m) {
+ HashMap<K, V> next = new HashMap<>(map);
+ next.putAll(m);
+ map = next;
+ }
+
+ @Override
+ public void clear() {
+ map = new HashMap<>();
+ }
+
+ @Override
+ public Set<K> keySet() {
+ return map.keySet();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet() {
+ return map.entrySet();
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java b/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java
new file mode 100644
index 00000000000..38c5bafc0d6
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/DaemonThreadFactory.java
@@ -0,0 +1,48 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * A simple thread factory that decorates <code>Executors.defaultThreadFactory()</code>
+ * and sets all created threads to be daemon threads.
+ *
+ * @author <a href="mailto:einarmr@yahoo-inc.com">Einar M R Rosenvinge</a>
+ */
+public class DaemonThreadFactory implements ThreadFactory {
+ private ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+ private String prefix = null;
+
+ /**
+ * Creates a deamon thread factory that creates threads with the default names
+ * provided by <code>Executors.defaultThreadFactory()</code>.
+ */
+ public DaemonThreadFactory() {
+ }
+
+ /**
+ * Creates a deamon thread factory that creates threads with the default names
+ * provided by <code>Executors.defaultThreadFactory()</code> prepended by the
+ * specified prefix.
+ *
+ * @param prefix the thread name prefix to use
+ */
+ public DaemonThreadFactory(String prefix) {
+ this.prefix = prefix;
+ }
+
+ public String getPrefix() {
+ return prefix;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread t = defaultThreadFactory.newThread(runnable);
+ t.setDaemon(true);
+ if (prefix != null) {
+ t.setName(prefix + t.getName());
+ }
+ return t;
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java b/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java
new file mode 100644
index 00000000000..389fe8a85ea
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/EventBarrier.java
@@ -0,0 +1,140 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Reference implementation of the 'Incremental Minimal Event Barrier'
+ * algorithm. An event in this context is defined to be something that
+ * happens during a time interval. An event barrier is a time interval
+ * for which events may start before or end after, but not both. The
+ * problem solved by the algorithm is to determine the minimal event
+ * barrier starting at a given time. In other words; wait for the
+ * currently active events to complete. The most natural use of this
+ * algorithm would be to make a thread wait for events happening in
+ * other threads to complete.
+ *
+ * @author <a href="mailto:havardpe@yahoo-inc.com">Haavard Pettersen</a>
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public class EventBarrier {
+
+ private final List<Entry> queue = new LinkedList<>();
+ private int barrierToken = 0;
+ private int eventCount = 0;
+
+ /**
+ * At creation there are no active events and no pending barriers.
+ */
+ public EventBarrier() {
+ // empty
+ }
+
+ /**
+ * Obtain the current number of active events. This method is
+ * intended for testing and debugging.
+ *
+ * @return Number of active events.
+ */
+ int getNumEvents() {
+ int cnt = eventCount;
+ for (Entry entry : queue) {
+ cnt += entry.eventCount;
+ }
+ return cnt;
+ }
+
+ /**
+ * Obtain the current number of pending barriers. This method is
+ * intended for testing and debugging.
+ *
+ * @return Number of pending barriers.
+ */
+ int getNumBarriers() {
+ return queue.size();
+ }
+
+ /**
+ * Signal the start of an event. The value returned from this
+ * method must later be passed to the completeEvent method when
+ * signaling the completion of the event.
+ *
+ * @return Opaque token identifying the started event.
+ */
+ public int startEvent() {
+ ++eventCount;
+ return barrierToken;
+ }
+
+ /**
+ * Signal the completion of an event. The value passed to this
+ * method must be the same as the return value previously obtained
+ * from the startEvent method. This method will signal the
+ * completion of all pending barriers that were completed by the
+ * completion of this event.
+ *
+ * @param token Opaque token identifying the completed event.
+ */
+ public void completeEvent(int token) {
+ if (token == this.barrierToken) {
+ --eventCount;
+ return;
+ }
+ --queue.get(queue.size() - (this.barrierToken - token)).eventCount;
+ while (!queue.isEmpty() && queue.get(0).eventCount == 0) {
+ queue.remove(0).handler.completeBarrier();
+ }
+ }
+
+ /**
+ * Initiate the detection of the minimal event barrier starting
+ * now. If this method returns false it means that no events were
+ * currently active and the minimal event barrier was infinitely
+ * small. If this method returns false the handler will not be
+ * notified of the completion of the barrier. If this method
+ * returns true it means that the started barrier is pending and
+ * that the handler passed to this method will be notified of its
+ * completion at a later time.
+ *
+ * @param handler Handler notified of the completion of the barrier.
+ * @return True if a barrier was started, false if no events were active.
+ */
+ public boolean startBarrier(BarrierWaiter handler) {
+ if (eventCount == 0 && queue.isEmpty()) {
+ return false;
+ }
+ queue.add(new Entry(eventCount, handler));
+ ++barrierToken;
+ eventCount = 0;
+ return true;
+ }
+
+ /**
+ * Declares the interface required to wait for the detection of a
+ * minimal event barrier. An object that implements this is passed
+ * to the {@link EventBarrier#startBarrier(BarrierWaiter)}.
+ */
+ public interface BarrierWaiter {
+
+ /**
+ * Callback invoked by the thread that detected the minimal
+ * event barrier. Once this is called, all events taking place
+ * at or before the corresponding call to {@link
+ * EventBarrier#startBarrier(BarrierWaiter)} have ended.
+ */
+ public void completeBarrier();
+ }
+
+ private static class Entry {
+
+ int eventCount;
+ final BarrierWaiter handler;
+
+ Entry(int eventCount, BarrierWaiter handler) {
+ this.eventCount = eventCount;
+ this.handler = handler;
+ }
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java b/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java
new file mode 100644
index 00000000000..c2d19831810
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/LocalInstance.java
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import com.yahoo.concurrent.ThreadLocalDirectory.ObservableUpdater;
+import com.yahoo.concurrent.ThreadLocalDirectory.Updater;
+
+/**
+ * Only for use along with ThreadLocalDirectory. A thread local data container
+ * instance. The class is visible to avoid indirection through the internal
+ * {@link ThreadLocal} in ThreadLocalDirectory if possible, but has no user
+ * available methods.
+ *
+ * @param AGGREGATOR
+ * the structure to insert produced data into
+ * @param SAMPLE
+ * type of produced data to insert from each participating thread
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public final class LocalInstance<AGGREGATOR, SAMPLE> {
+ /**
+ * The current generation of data produced from a single thread, where
+ * generation is the period between two subsequent calls to
+ * ThreadLocalDirectory.fetch().
+ */
+ private AGGREGATOR current;
+
+ // see comment on setRegistered(boolean) for locking explanation
+ private boolean isRegistered = false;
+ private final Object lock = new Object();
+
+ LocalInstance(Updater<AGGREGATOR, SAMPLE> updater) {
+ current = updater.createGenerationInstance(null);
+ }
+
+ boolean update(SAMPLE x, Updater<AGGREGATOR, SAMPLE> updater) {
+ synchronized (lock) {
+ current = updater.update(current, x);
+ return isRegistered;
+ }
+ }
+
+ AGGREGATOR getAndReset(Updater<AGGREGATOR, SAMPLE> updater) {
+ AGGREGATOR previous;
+ synchronized (lock) {
+ previous = current;
+ current = updater.createGenerationInstance(previous);
+ setRegistered(false);
+ }
+ return previous;
+ }
+
+ AGGREGATOR copyCurrent(ObservableUpdater<AGGREGATOR, SAMPLE> updater) {
+ AGGREGATOR view;
+ synchronized (lock) {
+ view = updater.copy(current);
+ }
+ return view;
+ }
+
+ // This is either set by the putting thread or the fetching thread. If
+ // it is set by the putting thread, then there is no memory barrier,
+ // because it is only _read_ in the putting thread. If it is set by the
+ // fetching thread, then the memory barrier is this.lock. This
+ // roundabout way is to avoid creating many-to-many memory barrier and
+ // locking relationships.
+ void setRegistered(boolean isRegistered) {
+ this.isRegistered = isRegistered;
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java
new file mode 100644
index 00000000000..339d8002c4f
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/Receiver.java
@@ -0,0 +1,86 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import com.yahoo.collections.Tuple2;
+
+/**
+ * A class for sending single messages between threads with timeout. Typical use
+ * would be
+ *
+ * <pre>
+ * Receiver&lt;SomeMessage&gt; receiver = new Receiver&lt;SomeMessage&gt;();
+ * SomeRunnable runnable = new SomeRunnable(receiver);
+ * Thread worker = new Thread(runnable);
+ * worker.start();
+ * Pair&lt;Receiver.MessageState, SomeMessage&gt; answer = receiver.get(500L);
+ * </pre>
+ *
+ * ... and in the worker thread simply
+ *
+ * <pre>
+ * receiver.put(new SomeMessage(...))
+ * </pre>
+ *
+ * <p>
+ * Any number of threads may wait for the same message. Sending null references
+ * is supported. The object is intended for delivering only single message,
+ * there is no support for recycling it.
+ * </p>
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class Receiver<T> {
+ /**
+ * MessageState is the reason for returning from get(). If a message is
+ * received before timeout, the state will be VALID. If no message is
+ * received before timeout, state is TIMEOUT.
+ */
+ public enum MessageState {
+ VALID, TIMEOUT;
+ };
+ private final Object lock = new Object();
+ private T message = null;
+ private boolean received = false;
+
+ /**
+ * Make a message available for consumers.
+ *
+ * @param message the message to send
+ * @throws IllegalStateException if a message has already been received here
+ */
+ public void put(T message) {
+ synchronized (lock) {
+ if (received) {
+ throw new IllegalStateException("Multiple puts on a single Receiver instance is not legal.");
+ }
+ this.message = message;
+ received = true;
+ lock.notifyAll();
+ }
+ }
+
+ /**
+ * Wait for up to "timeout" milliseconds for an incoming message. This hides
+ * spurious wakeup, but InterruptedException will be propagated.
+ *
+ * @param timeout
+ * maximum time to wait for message in milliseconds
+ * @return a Pair instance containing the reason for returning and the
+ * message possible received
+ * @throws InterruptedException if the waiting thread is interrupted
+ */
+ public Tuple2<MessageState, T> get(long timeout) throws InterruptedException {
+ long barrier = System.currentTimeMillis() + timeout;
+ synchronized (lock) {
+ while (!received) {
+ long t = System.currentTimeMillis();
+ if (t >= barrier) {
+ return new Tuple2<>(MessageState.TIMEOUT, null);
+ }
+ lock.wait(barrier - t);
+ }
+ return new Tuple2<>(MessageState.VALID, message);
+ }
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java b/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java
new file mode 100644
index 00000000000..5aa4990a86a
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/SystemTimer.java
@@ -0,0 +1,41 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is an implementation of {@link Timer} that is backed by an actual system timer.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public enum SystemTimer implements Timer {
+
+ INSTANCE;
+
+ private volatile long millis;
+
+ private SystemTimer() {
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ Thread thread = new Thread() {
+
+ @Override
+ public void run() {
+ while (true) {
+ millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ @Override
+ public long milliTime() {
+ return millis;
+ }
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java
new file mode 100644
index 00000000000..5be6da8c66d
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadFactoryFactory.java
@@ -0,0 +1,72 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: balder
+ * Date: 24.04.13
+ * Time: 19:00
+ * To change this template use File | Settings | File Templates.
+ */
+public class ThreadFactoryFactory {
+ static public synchronized ThreadFactory getThreadFactory(String name) {
+ PooledFactory p = factory.get(name);
+ if (p == null) {
+ p = new PooledFactory(name);
+ factory.put(name, p);
+ }
+ return p.getFactory(false);
+ }
+ static public synchronized ThreadFactory getDaemonThreadFactory(String name) {
+ PooledFactory p = factory.get(name);
+ if (p == null) {
+ p = new PooledFactory(name);
+ factory.put(name, p);
+ }
+ return p.getFactory(true);
+ }
+ private static class PooledFactory {
+ private static class Factory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+ final boolean isDaemon;
+
+ Factory(final String name, boolean isDaemon) {
+ this.isDaemon = isDaemon;
+ final SecurityManager s = System.getSecurityManager();
+ group = (s != null)
+ ? s.getThreadGroup()
+ : Thread.currentThread().getThreadGroup();
+ namePrefix = name;
+ }
+
+ @Override
+ public Thread newThread(final Runnable r) {
+ final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+ if (t.isDaemon() != isDaemon) {
+ t.setDaemon(isDaemon);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+ }
+ PooledFactory(String name) {
+ this.name = name;
+ }
+ ThreadFactory getFactory(boolean isDaemon) {
+ return new Factory(name + "-" + poolId.getAndIncrement() + "-thread-", isDaemon);
+
+ }
+ private final String name;
+ private final AtomicInteger poolId = new AtomicInteger(1);
+ }
+ static private Map<String, PooledFactory> factory = new HashMap<>();
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java
new file mode 100644
index 00000000000..ef2273bdb25
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadLocalDirectory.java
@@ -0,0 +1,346 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class for multiple producers and potentially multiple consumers (usually
+ * only one).
+ *
+ * <p>
+ * The consuming threads always unregisters the data producers when doing
+ * fetch(). This is the reason for having to do update through the directory.
+ * The reason for this is otherwise, we would either get reference leaks from
+ * registered objects belonging to dead threads if we did not unregister
+ * instances, otherwise the sampling thread would have to unregister the
+ * instance, and then we would create a memory relationship between all
+ * producing threads, which is exactly what this class aims to avoid.
+ * </p>
+ *
+ * <p>
+ * A complete example from a test:
+ * </p>
+ *
+ * <pre>
+ * private static class SumUpdater implements ThreadLocalDirectory.Updater&lt;Integer, Integer&gt; {
+ *
+ * {@literal @}Override
+ * public Integer update(Integer current, Integer x) {
+ * return Integer.valueOf(current.intValue() + x.intValue());
+ * }
+ *
+ * {@literal @}Override
+ * public Integer createGenerationInstance(Integer previous) {
+ * return Integer.valueOf(0);
+ * }
+ * }
+ *
+ * ... then the producers does (where r is in instance of
+ * ThreadLocalDirectory)...
+ *
+ * {@literal @}Override
+ * public void run() {
+ * LocalInstance&lt;Integer, Integer&gt; s = r.getLocalInstance();
+ * for (int i = 0; i &lt; 500; ++i) {
+ * r.update(Integer.valueOf(i), s);
+ * }
+ * }
+ *
+ * ... and the consumer...
+ *
+ * List&lt;Integer&gt; measurements = s.fetch()
+ * </pre>
+ *
+ * <p>
+ * Invoking r.fetch() will produce a list of integers from all the participating
+ * threads at any time.
+ * </p>
+ *
+ * <p>
+ * Refer to e.g. com.yahoo.search.statistics.PeakQpsSearcher for a production
+ * example.
+ * </p>
+ *
+ * @param AGGREGATOR
+ * the type input data is aggregated into
+ * @param SAMPLE
+ * the type of input data
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public final class ThreadLocalDirectory<AGGREGATOR, SAMPLE> {
+ /**
+ * Factory interface to create the data container for each generation of
+ * samples, and putting data into it.
+ *
+ * <p>
+ * The method for actual insertion of a single sample into the current data
+ * generation exists separate from LocalInstance.AGGREGATOR to make it
+ * possible to use e.g. Integer and List as AGGREGATOR types.
+ * </p>
+ *
+ * <p>
+ * The allocation and sampling is placed in the same class, since you always
+ * need to implement both.
+ * </p>
+ *
+ * @param AGGREGATOR
+ * The type of the data container to produce
+ * @param SAMPLE
+ * The type of the incoming data to store in the container.
+ */
+ public interface Updater<AGGREGATOR, SAMPLE> {
+ /**
+ * Create data container to receive produced data. This is invoked once
+ * on every instance every time ThreadLocalDirectory.fetch() is invoked.
+ * This might be an empty list, creating a new counter set to zero, or
+ * even copying the current state of LocalInstance.current.
+ * LocalInstance.current will be set to the value received from this
+ * factory after invokation this method.
+ *
+ * <p>
+ * The first time this method is invoked for a thread, previous will be
+ * null.
+ * </p>
+ *
+ * <p>
+ * If using mutable objects, an implementation should always create a
+ * new instance in this method, as the previous data generation will be
+ * transmitted to the consuming thread. This obviously does not matter
+ * if using immutable (value) objects.
+ * </p>
+ *
+ * <p>
+ * Examples:
+ * </p>
+ *
+ * <p>
+ * Using a mutable aggregator (a list of integers):
+ * </p>
+ *
+ * <pre>
+ * if (previous == null) {
+ * return new ArrayList&lt;Integer&gt;();
+ * } else {
+ * return new ArrayList&lt;Integer&gt;(previous.size());
+ * }
+ * </pre>
+ *
+ * <p>
+ * Using an immutable aggregator (an integer):
+ * </p>
+ *
+ * <pre>
+ * return Integer.valueOf(0);
+ * </pre>
+ *
+ * @return a fresh structure to receive data
+ */
+ public AGGREGATOR createGenerationInstance(AGGREGATOR previous);
+
+ /**
+ * Insert a data element of type S into the current generation of data
+ * carrier T. This could be e.g. adding to a list, putting into a local
+ * histogram or increasing a counter.
+ *
+ * <p>
+ * The method may or may not return a fresh instance of the current
+ * value for each invokation, if using a mutable aggregator the typical
+ * case will be returning the same instance for the new and old value of
+ * current, while if using an immutable aggregator, one is forced to
+ * return new instances.
+ * </p>
+ *
+ * <p>
+ * Examples:
+ * </p>
+ *
+ * <p>
+ * Using a mutable aggregator (a list of instances of type SAMPLE):
+ * </p>
+ *
+ * <pre>
+ * current.add(x);
+ * return current;
+ * </pre>
+ *
+ * <p>
+ * Using an immutable aggregator (Integer) while also using Integer as
+ * type for SAMPLE:
+ * </p>
+ *
+ * <pre>
+ * return Integer.valueOf(current.intValue() + x.intValue());
+ * </pre>
+ *
+ * @param current
+ * the current generation's data container
+ * @param x
+ * the data to insert
+ * @return the new current value, may be the same as previous
+ */
+ public AGGREGATOR update(AGGREGATOR current, SAMPLE x);
+ }
+
+ /**
+ * Implement this interface to be able to view the contents of a
+ * ThreadLocalDirectory without resetting the local instances in each
+ * thread.
+ *
+ * @param <AGGREGATOR>
+ * as for {@link Updater}
+ * @param <SAMPLE>
+ * as for {@link Updater}
+ * @see ThreadLocalDirectory#view()
+ */
+ public interface ObservableUpdater<AGGREGATOR, SAMPLE> extends
+ Updater<AGGREGATOR, SAMPLE> {
+ /**
+ * Create an application specific copy of the AGGREGATOR for a thread.
+ *
+ * @param current
+ * the AGGREGATOR instance to copy
+ * @return a copy of the incoming parameter
+ */
+ public AGGREGATOR copy(AGGREGATOR current);
+ }
+
+ private final ThreadLocal<LocalInstance<AGGREGATOR, SAMPLE>> local = new ThreadLocal<>();
+ private final Object directoryLock = new Object();
+ private List<LocalInstance<AGGREGATOR, SAMPLE>> directory = new ArrayList<>();
+ private final Updater<AGGREGATOR, SAMPLE> updater;
+ private final ObservableUpdater<AGGREGATOR, SAMPLE> observableUpdater;
+
+ public ThreadLocalDirectory(Updater<AGGREGATOR, SAMPLE> updater) {
+ this.updater = updater;
+ if (updater instanceof ObservableUpdater) {
+ observableUpdater = (ObservableUpdater<AGGREGATOR, SAMPLE>) updater;
+ } else {
+ observableUpdater = null;
+ }
+ }
+
+ private void put(LocalInstance<AGGREGATOR, SAMPLE> q) {
+ // Has to set registered before adding to the list. Otherwise, the
+ // instance might be removed from the list, set as unregistered, and
+ // then the local thread might happily remove that information. The Java
+ // memory model is a guarantuee for the minimum amount of visibility,
+ // not a definition of the actual amount.
+ q.setRegistered(true);
+ synchronized (directoryLock) {
+ directory.add(q);
+ }
+ }
+
+ /**
+ * Fetch the current set of sampled data, and reset state of all thread
+ * local instances. The producer threads will not alter data in the list
+ * returned from this method.
+ *
+ * @return a list of data from all producer threads
+ */
+ public List<AGGREGATOR> fetch() {
+ List<AGGREGATOR> contained;
+ List<LocalInstance<AGGREGATOR, SAMPLE>> previous;
+ int previousIntervalSize;
+
+ synchronized (directoryLock) {
+ previousIntervalSize = directory.size();
+ previous = directory;
+ directory = new ArrayList<>(
+ previousIntervalSize);
+ }
+ contained = new ArrayList<>(previousIntervalSize);
+ // Yes, this is an inconsistence about when the registered state is
+ // reset and when the thread local is removed from the list.
+ // LocalInstance.isRegistered tells whether the data is available to
+ // some consumer, not whether the LocalInstance is a member of the
+ // directory.
+ for (LocalInstance<AGGREGATOR, SAMPLE> x : previous) {
+ contained.add(x.getAndReset(updater));
+ }
+ return contained;
+ }
+
+ /**
+ * Get a view of the current data. This requires this ThreadLocalDirectory
+ * to have been instantiated with an updater implementing ObservableUpdater.
+ *
+ * @return a list of a copy of the current data in all producer threads
+ * @throws IllegalStateException
+ * if the updater does not implement {@link ObservableUpdater}
+ */
+ public List<AGGREGATOR> view() {
+ if (observableUpdater == null) {
+ throw new IllegalStateException("Does not use observable updaters.");
+ }
+ List<LocalInstance<AGGREGATOR, SAMPLE>> current;
+ List<AGGREGATOR> view;
+ synchronized (directoryLock) {
+ current = new ArrayList<>(
+ directory);
+ }
+ view = new ArrayList<>(current.size());
+ for (LocalInstance<AGGREGATOR, SAMPLE> x : current) {
+ view.add(x.copyCurrent(observableUpdater));
+ }
+ return view;
+ }
+
+ private LocalInstance<AGGREGATOR, SAMPLE> getOrCreateLocal() {
+ LocalInstance<AGGREGATOR, SAMPLE> current = local.get();
+ if (current == null) {
+ current = new LocalInstance<>(updater);
+ local.set(current);
+ }
+ return current;
+ }
+
+ /**
+ * Expose the thread local for the running thread, for use in conjunction
+ * with update(SAMPLE, LocalInstance&lt;AGGREGATOR, SAMPLE&gt;).
+ *
+ * @return the current thread's local instance
+ */
+ public LocalInstance<AGGREGATOR, SAMPLE> getLocalInstance() {
+ return getOrCreateLocal();
+ }
+
+ /**
+ * Input data from a producer thread.
+ *
+ * @param x
+ * the data to insert
+ */
+ public void update(SAMPLE x) {
+ update(x, getOrCreateLocal());
+ }
+
+ /**
+ * Update a value with a given thread local instance.
+ *
+ * <p>
+ * If a producer thread is to insert a series of data, it is desirable to
+ * limit the number of memory transactions to the theoretical minimum. Since
+ * reading a thread local is the memory equivalence of reading a volatile,
+ * it is then useful to avoid re-reading the running threads' input
+ * instance. For this scenario, fetch the running thread's instance with
+ * getLocalInstance(), and then insert the produced data with the multiple
+ * calls necessary to update(SAMPLE, LocalInstance&lt;AGGREGATOR, SAMPLE&gt;).
+ * </p>
+ *
+ * @param x
+ * the data to insert
+ * @param localInstance
+ * the local data insertion instance
+ */
+ public void update(SAMPLE x, LocalInstance<AGGREGATOR, SAMPLE> localInstance) {
+ boolean isRegistered;
+ isRegistered = localInstance.update(x, updater);
+ if (!isRegistered) {
+ put(localInstance);
+ }
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java
new file mode 100644
index 00000000000..8a79db6a6eb
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/ThreadRobustList.java
@@ -0,0 +1,151 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A list which tolerates concurrent adds from one other thread while it is
+ * read. More precisely: <i>This list is guaranteed to provide a self-consistent
+ * read view regardless of the internal order in which the primitive mutating
+ * operations on it are observed from the reading thread.</i>
+ * <p>
+ * This is useful for traced information as there may be timed out threads
+ * working on the structure after it is returned upwards for consumption.
+ *
+ * @since 4.2
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ThreadRobustList<T> implements Iterable<T> {
+
+ private Object[] items;
+
+ /** Index of the next item */
+ private int next = 0;
+
+ public ThreadRobustList() {
+ this(10);
+ }
+
+ public ThreadRobustList(final int initialCapacity) {
+ items = new Object[initialCapacity];
+ }
+
+ public void add(final T item) {
+ Object[] workItems = items;
+ if (next >= items.length) {
+ final int newLength = 20 + items.length * 2;
+ workItems = Arrays.copyOf(workItems, newLength);
+ workItems[next++] = item;
+ items = workItems;
+ } else {
+ workItems[next++] = item;
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements of this. This iterator does not
+ * support remove.
+ */
+ @Override
+ public Iterator<T> iterator() {
+ return new ThreadRobustIterator(items);
+ }
+
+ /**
+ * Returns an iterator over the elements of this, starting at the last
+ * element and working backwards. This iterator does not support remove.
+ */
+ public Iterator<T> reverseIterator() {
+ return new ThreadRobustReverseIterator(items);
+ }
+
+ public boolean isEmpty() {
+ return next == 0;
+ }
+
+ private class ThreadRobustIterator implements Iterator<T> {
+
+ private final Object[] items;
+
+ private int nextIndex = 0;
+
+ public ThreadRobustIterator(final Object[] items) {
+ this.items = items;
+ }
+
+ public @Override
+ void remove() {
+ throw new UnsupportedOperationException(
+ "remove() is not supported on thread robust list iterators");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements");
+ }
+
+ return (T) items[nextIndex++];
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (nextIndex >= items.length) {
+ return false;
+ }
+ if (items[nextIndex] == null) {
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ private class ThreadRobustReverseIterator implements Iterator<T> {
+
+ private final Object[] items;
+
+ private int nextIndex;
+
+ public ThreadRobustReverseIterator(final Object[] items) {
+ this.items = items;
+ nextIndex = findLastAssignedIndex(items);
+ }
+
+ private int findLastAssignedIndex(final Object[] items) {
+ for (int i = items.length - 1; i >= 0; i--) {
+ if (items[i] != null) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public @Override
+ void remove() {
+ throw new UnsupportedOperationException(
+ "remove() is not supported on thread robust list iterators");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more elements");
+ }
+
+ return (T) items[nextIndex--];
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextIndex >= 0;
+ }
+
+ }
+
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java
new file mode 100644
index 00000000000..aefbfafb7b1
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/Timer.java
@@ -0,0 +1,19 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.concurrent;
+
+/**
+ * This interface wraps access to some timer that can be used to measure elapsed time, in milliseconds. This
+ * abstraction allows for unit testing the behavior of time-based constructs.
+ *
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
+ */
+public interface Timer {
+
+ /**
+ * Returns the current value of some arbitrary timer, in milliseconds. This method can only be used to measure
+ * elapsed time and is not related to any other notion of system or wall-clock time.
+ *
+ * @return The current value of the timer, in milliseconds.
+ */
+ public long milliTime();
+}
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java b/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java
new file mode 100644
index 00000000000..dd0d639166d
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/package-info.java
@@ -0,0 +1,5 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+package com.yahoo.concurrent;
+
+import com.yahoo.osgi.annotation.ExportPackage;