summaryrefslogtreecommitdiffstats
path: root/vespajlib
diff options
context:
space:
mode:
authorJon Marius Venstad <jvenstad@yahoo-inc.com>2019-05-24 13:24:44 +0200
committerJon Marius Venstad <jvenstad@yahoo-inc.com>2019-05-24 15:55:22 +0200
commita9d67ff3662ef5020ce51197b698289753a83399 (patch)
tree4e5abb2e43639e9849002423df946c41dc59e242 /vespajlib
parenta159b4ce190a42e2c25ad5fae0e010d947bbe89c (diff)
Add StripedExecutor for serialising tasks per key
Diffstat (limited to 'vespajlib')
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java106
-rw-r--r--vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java44
2 files changed, 150 insertions, 0 deletions
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java b/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java
new file mode 100644
index 00000000000..43a170230ad
--- /dev/null
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java
@@ -0,0 +1,106 @@
+package com.yahoo.concurrent;
+
+import com.yahoo.yolean.Exceptions;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Executor that serializes runnables with the same key, but may parallelize over different keys.
+ *
+ * @author jonmv
+ */
+public class StripedExecutor<Key> {
+
+ private static final Logger logger = Logger.getLogger(StripedExecutor.class.getName());
+
+ private final Map<Key, Deque<Runnable>> commands = new HashMap<>();
+ private final ExecutorService executor;
+
+ /** Creates a new StripedExecutor which delegates to a {@link Executors#newCachedThreadPool(ThreadFactory)}. */
+ public StripedExecutor() {
+ this(Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(StripedExecutor.class.getName())));
+ }
+
+ /** Creates a new StripedExecutor which delegates to the given executor service. */
+ public StripedExecutor(ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ /**
+ * Executes the given command. If other commands are already running or queued for the given key,
+ * execution of this command happens after those, on the same thread as is running them.
+ * <p>
+ * Any exception thrown by the command will only be logged, to allow subsequent commands to run.
+ */
+ public void execute(Key key, Runnable command) {
+ synchronized (commands) {
+ if (null == commands.putIfAbsent(key, new ArrayDeque<>(List.of(command))))
+ executor.execute(() -> runAll(key));
+ else
+ commands.get(key).add(command);
+ }
+ }
+
+ /** Runs all submitted commands for the given key, then removes the queue for the key and returns. */
+ private void runAll(Key key) {
+ while (true) {
+ Runnable command;
+ synchronized (commands) {
+ command = commands.containsKey(key) ? commands.get(key).poll() : null;
+ if (command == null) {
+ commands.remove(key);
+ break;
+ }
+ }
+ try {
+ command.run();
+ }
+ catch (RuntimeException e) {
+ logger.log(Level.WARNING, () -> "Exception caught: " + Exceptions.toMessageString(e));
+ }
+ }
+ }
+
+ /** Shuts down the delegate executor and waits for it to terminate. */
+ public void shutdownAndWait() {
+ shutdownAndWait(Duration.ofSeconds(30), Duration.ofSeconds(10));
+ }
+
+ /**
+ * Shuts down the delegate executor and waits for the given grace duration for it to terminate.
+ * If this fails, tells the executor to {@link ExecutorService#shutdownNow()}), and waits for the die duration.
+ */
+ public void shutdownAndWait(Duration grace, Duration die) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(grace.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ logger.log(Level.INFO, "Interrupted waiting for executor to complete", e);
+ }
+ if ( ! executor.isTerminated()) {
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(die.toMillis(), TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e) {
+ logger.log(Level.WARNING, "Interrupted waiting for executor to die", e);
+ }
+ if ( ! executor.isTerminated())
+ throw new RuntimeException("Failed to shut down executor");
+ }
+ }
+
+}
+
diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java
new file mode 100644
index 00000000000..712f24c79d4
--- /dev/null
+++ b/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java
@@ -0,0 +1,44 @@
+package com.yahoo.concurrent;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author jonmv
+ */
+public class StripedExecutorTest {
+
+ private static final int workers = 1 << 5;
+ private static final int values = 1 << 10;
+
+ @Test
+ public void testSerialization() {
+ AtomicLong counter = new AtomicLong(0);
+ List<Deque<Long>> sequences = new ArrayList<>();
+ for (int j = 0; j < workers; j++)
+ sequences.add(new ConcurrentLinkedDeque<>());
+
+ StripedExecutor<Integer> executor = new StripedExecutor<>();
+ for (int i = 0; i < values; i++)
+ for (int j = 0; j < workers; j++) {
+ Deque<Long> sequence = sequences.get(j);
+ executor.execute(j, () -> sequence.add(counter.incrementAndGet()));
+ }
+ executor.shutdownAndWait();
+
+ for (int j = 0; j < workers; j++) {
+ assertEquals(values, sequences.get(j).size());
+ assertArrayEquals(sequences.get(j).stream().sorted().toArray(Long[]::new),
+ sequences.get(j).toArray(Long[]::new));
+ }
+ }
+
+}