// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.concurrent; import java.util.ArrayList; import java.util.List; /** * A class for multiple producers and potentially multiple consumers (usually * only one). * *
* The consuming threads always unregisters the data producers when doing * fetch(). This is the reason for having to do update through the directory. * The reason for this is otherwise, we would either get reference leaks from * registered objects belonging to dead threads if we did not unregister * instances, otherwise the sampling thread would have to unregister the * instance, and then we would create a memory relationship between all * producing threads, which is exactly what this class aims to avoid. *
* ** A complete example from a test: *
* ** private static class SumUpdater implements ThreadLocalDirectory.Updater<Integer, Integer> { * * {@literal @}Override * public Integer update(Integer current, Integer x) { * return Integer.valueOf(current.intValue() + x.intValue()); * } * * {@literal @}Override * public Integer createGenerationInstance(Integer previous) { * return Integer.valueOf(0); * } * } * * ... then the producers does (where r is in instance of * ThreadLocalDirectory)... * * {@literal @}Override * public void run() { * LocalInstance<Integer, Integer> s = r.getLocalInstance(); * for (int i = 0; i < 500; ++i) { * r.update(Integer.valueOf(i), s); * } * } * * ... and the consumer... * * List<Integer> measurements = s.fetch() ** *
* Invoking r.fetch() will produce a list of integers from all the participating * threads at any time. *
* ** Refer to e.g. com.yahoo.search.statistics.PeakQpsSearcher for a production * example. *
* * @param* The method for actual insertion of a single sample into the current data * generation exists separate from LocalInstance.AGGREGATOR to make it * possible to use e.g. Integer and List as AGGREGATOR types. *
* ** The allocation and sampling is placed in the same class, since you always * need to implement both. *
* * @param* The first time this method is invoked for a thread, previous will be * null. *
* ** If using mutable objects, an implementation should always create a * new instance in this method, as the previous data generation will be * transmitted to the consuming thread. This obviously does not matter * if using immutable (value) objects. *
* ** Examples: *
* ** Using a mutable aggregator (a list of integers): *
* ** if (previous == null) { * return new ArrayList<Integer>(); * } else { * return new ArrayList<Integer>(previous.size()); * } ** *
* Using an immutable aggregator (an integer): *
* ** return Integer.valueOf(0); ** * @return a fresh structure to receive data */ AGGREGATOR createGenerationInstance(AGGREGATOR previous); /** * Insert a data element of type S into the current generation of data * carrier T. This could be e.g. adding to a list, putting into a local * histogram or increasing a counter. * *
* The method may or may not return a fresh instance of the current * value for each invokation, if using a mutable aggregator the typical * case will be returning the same instance for the new and old value of * current, while if using an immutable aggregator, one is forced to * return new instances. *
* ** Examples: *
* ** Using a mutable aggregator (a list of instances of type SAMPLE): *
* ** current.add(x); * return current; ** *
* Using an immutable aggregator (Integer) while also using Integer as * type for SAMPLE: *
* ** return Integer.valueOf(current.intValue() + x.intValue()); ** * @param current * the current generation's data container * @param x * the data to insert * @return the new current value, may be the same as previous */ AGGREGATOR update(AGGREGATOR current, SAMPLE x); } /** * Implement this interface to be able to view the contents of a * ThreadLocalDirectory without resetting the local instances in each * thread. * * @param
* If a producer thread is to insert a series of data, it is desirable to * limit the number of memory transactions to the theoretical minimum. Since * reading a thread local is the memory equivalence of reading a volatile, * it is then useful to avoid re-reading the running threads' input * instance. For this scenario, fetch the running thread's instance with * getLocalInstance(), and then insert the produced data with the multiple * calls necessary to update(SAMPLE, LocalInstance<AGGREGATOR, SAMPLE>). *
* * @param x the data to insert * @param localInstance the local data insertion instance */ public void update(SAMPLE x, LocalInstance