From a9d67ff3662ef5020ce51197b698289753a83399 Mon Sep 17 00:00:00 2001 From: Jon Marius Venstad Date: Fri, 24 May 2019 13:24:44 +0200 Subject: Add StripedExecutor for serialising tasks per key --- .../java/com/yahoo/concurrent/StripedExecutor.java | 106 +++++++++++++++++++++ .../com/yahoo/concurrent/StripedExecutorTest.java | 44 +++++++++ 2 files changed, 150 insertions(+) create mode 100644 vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java create mode 100644 vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java (limited to 'vespajlib/src') diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java b/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java new file mode 100644 index 00000000000..43a170230ad --- /dev/null +++ b/vespajlib/src/main/java/com/yahoo/concurrent/StripedExecutor.java @@ -0,0 +1,106 @@ +package com.yahoo.concurrent; + +import com.yahoo.yolean.Exceptions; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Executor that serializes runnables with the same key, but may parallelize over different keys. + * + * @author jonmv + */ +public class StripedExecutor { + + private static final Logger logger = Logger.getLogger(StripedExecutor.class.getName()); + + private final Map> commands = new HashMap<>(); + private final ExecutorService executor; + + /** Creates a new StripedExecutor which delegates to a {@link Executors#newCachedThreadPool(ThreadFactory)}. */ + public StripedExecutor() { + this(Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory(StripedExecutor.class.getName()))); + } + + /** Creates a new StripedExecutor which delegates to the given executor service. */ + public StripedExecutor(ExecutorService executor) { + this.executor = executor; + } + + /** + * Executes the given command. If other commands are already running or queued for the given key, + * execution of this command happens after those, on the same thread as is running them. + *

+ * Any exception thrown by the command will only be logged, to allow subsequent commands to run. + */ + public void execute(Key key, Runnable command) { + synchronized (commands) { + if (null == commands.putIfAbsent(key, new ArrayDeque<>(List.of(command)))) + executor.execute(() -> runAll(key)); + else + commands.get(key).add(command); + } + } + + /** Runs all submitted commands for the given key, then removes the queue for the key and returns. */ + private void runAll(Key key) { + while (true) { + Runnable command; + synchronized (commands) { + command = commands.containsKey(key) ? commands.get(key).poll() : null; + if (command == null) { + commands.remove(key); + break; + } + } + try { + command.run(); + } + catch (RuntimeException e) { + logger.log(Level.WARNING, () -> "Exception caught: " + Exceptions.toMessageString(e)); + } + } + } + + /** Shuts down the delegate executor and waits for it to terminate. */ + public void shutdownAndWait() { + shutdownAndWait(Duration.ofSeconds(30), Duration.ofSeconds(10)); + } + + /** + * Shuts down the delegate executor and waits for the given grace duration for it to terminate. + * If this fails, tells the executor to {@link ExecutorService#shutdownNow()}), and waits for the die duration. + */ + public void shutdownAndWait(Duration grace, Duration die) { + executor.shutdown(); + try { + executor.awaitTermination(grace.toMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + logger.log(Level.INFO, "Interrupted waiting for executor to complete", e); + } + if ( ! executor.isTerminated()) { + executor.shutdownNow(); + try { + executor.awaitTermination(die.toMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted waiting for executor to die", e); + } + if ( ! executor.isTerminated()) + throw new RuntimeException("Failed to shut down executor"); + } + } + +} + diff --git a/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java b/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java new file mode 100644 index 00000000000..712f24c79d4 --- /dev/null +++ b/vespajlib/src/test/java/com/yahoo/concurrent/StripedExecutorTest.java @@ -0,0 +1,44 @@ +package com.yahoo.concurrent; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * @author jonmv + */ +public class StripedExecutorTest { + + private static final int workers = 1 << 5; + private static final int values = 1 << 10; + + @Test + public void testSerialization() { + AtomicLong counter = new AtomicLong(0); + List> sequences = new ArrayList<>(); + for (int j = 0; j < workers; j++) + sequences.add(new ConcurrentLinkedDeque<>()); + + StripedExecutor executor = new StripedExecutor<>(); + for (int i = 0; i < values; i++) + for (int j = 0; j < workers; j++) { + Deque sequence = sequences.get(j); + executor.execute(j, () -> sequence.add(counter.incrementAndGet())); + } + executor.shutdownAndWait(); + + for (int j = 0; j < workers; j++) { + assertEquals(values, sequences.get(j).size()); + assertArrayEquals(sequences.get(j).stream().sorted().toArray(Long[]::new), + sequences.get(j).toArray(Long[]::new)); + } + } + +} -- cgit v1.2.3