summaryrefslogtreecommitdiffstats
path: root/yolean
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2022-04-29 18:29:09 +0200
committerjonmv <venstad@gmail.com>2022-05-02 12:02:02 +0200
commitae4f0388247496f11b6e878ef597e049b6ada246 (patch)
tree6d5d3c2ff5a99f3628a5e28385bb9ce4c858e745 /yolean
parenta817a14d2a781ee8fc12c89bba08828fd00ba5cb (diff)
Add Memoized for closeable lazy resources
Diffstat (limited to 'yolean')
-rw-r--r--yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java53
-rw-r--r--yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java89
2 files changed, 142 insertions, 0 deletions
diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
new file mode 100644
index 00000000000..1d3b8be4232
--- /dev/null
+++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java
@@ -0,0 +1,53 @@
+package com.yahoo.yolean.concurrent;
+
+import java.util.Objects;
+import java.util.function.Supplier;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Wraps a lazily initialised resource which needs to be shut down.
+ *
+ * @author jonmv
+ */
+public class Memoized<T, E extends Exception> implements Supplier<T>, AutoCloseable {
+
+ /** Provides a tighter bound on the thrown exception type. */
+ @FunctionalInterface
+ public interface Closer<T, E extends Exception> { void close(T t) throws E; }
+
+ private final Object monitor = new Object();
+ private final Closer<T, E> closer;
+ private volatile T wrapped;
+ private Supplier<T> factory;
+
+ public Memoized(Supplier<T> factory, Closer<T, E> closer) {
+ this.factory = requireNonNull(factory);
+ this.closer = requireNonNull(closer);
+ }
+
+ public static <T extends AutoCloseable> Memoized<T, ?> of(Supplier<T> factory) {
+ return new Memoized<>(factory, AutoCloseable::close);
+ }
+
+ @Override
+ public T get() {
+ if (wrapped == null) synchronized (monitor) {
+ if (factory != null) wrapped = factory.get();
+ factory = null;
+ if (wrapped == null) throw new IllegalStateException("already closed");
+ }
+ return wrapped;
+ }
+
+ @Override
+ public void close() throws E {
+ synchronized (monitor) {
+ T maybe = wrapped;
+ wrapped = null;
+ factory = null;
+ if (maybe != null) closer.close(maybe);
+ }
+ }
+
+} \ No newline at end of file
diff --git a/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java
new file mode 100644
index 00000000000..5f6268ce7da
--- /dev/null
+++ b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java
@@ -0,0 +1,89 @@
+package com.yahoo.yolean.concurrent;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+/**
+ * @author jonmv
+ */
+public class MemoizedTest {
+
+ final Phaser phaser = new Phaser();
+ final int threads = 128;
+
+ @Test
+ public void test() throws ExecutionException, InterruptedException {
+ var lazy = new Memoized<>(new OnceSupplier(), OnceCloseable::close);
+ phaser.register(); // test thread
+ phaser.register(); // whoever calls the factory
+
+ Phaser latch = new Phaser(threads + 1);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < 128; i++) {
+ futures.add(executor.submit(() -> {
+ latch.arriveAndAwaitAdvance();
+ lazy.get().rendezvous();
+ while (true) lazy.get();
+ }));
+ }
+
+ // All threads waiting for latch, will race to factory
+ latch.arriveAndAwaitAdvance();
+
+ // One thread waiting in factory, the others are blocked, will go to rendezvous
+ phaser.arriveAndAwaitAdvance();
+
+ // All threads waiting in rendezvous, will repeatedly get until failure
+ phaser.arriveAndAwaitAdvance();
+
+ // Unsynchronized close should be detected by all threads
+ lazy.close();
+
+ // Close should carry through only once
+ lazy.close();
+
+ assertEquals("already closed",
+ assertThrows(IllegalStateException.class, lazy::get).getMessage());
+
+ for (Future<?> future : futures)
+ assertEquals("java.lang.IllegalStateException: already closed",
+ assertThrows(ExecutionException.class, future::get).getMessage());
+
+ executor.shutdown();
+ }
+
+ class OnceSupplier implements Supplier<OnceCloseable> {
+ final AtomicBoolean initialized = new AtomicBoolean();
+ @Override public OnceCloseable get() {
+ phaser.arriveAndAwaitAdvance();
+ if ( ! initialized.compareAndSet(false, true)) fail("initialized more than once");
+ phaser.bulkRegister(threads - 1); // register all the threads who didn't get the factory
+ return new OnceCloseable();
+ }
+ }
+
+ class OnceCloseable implements AutoCloseable {
+ final AtomicBoolean closed = new AtomicBoolean();
+ @Override public void close() {
+ if ( ! closed.compareAndSet(false, true)) fail("closed more than once");
+ }
+ void rendezvous() {
+ phaser.arriveAndAwaitAdvance();
+ }
+ }
+
+}