From ae4f0388247496f11b6e878ef597e049b6ada246 Mon Sep 17 00:00:00 2001 From: jonmv Date: Fri, 29 Apr 2022 18:29:09 +0200 Subject: Add Memoized for closeable lazy resources --- .../java/com/yahoo/yolean/concurrent/Memoized.java | 53 +++++++++++++ .../com/yahoo/yolean/concurrent/MemoizedTest.java | 89 ++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java create mode 100644 yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java (limited to 'yolean') diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java new file mode 100644 index 00000000000..1d3b8be4232 --- /dev/null +++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java @@ -0,0 +1,53 @@ +package com.yahoo.yolean.concurrent; + +import java.util.Objects; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** + * Wraps a lazily initialised resource which needs to be shut down. + * + * @author jonmv + */ +public class Memoized implements Supplier, AutoCloseable { + + /** Provides a tighter bound on the thrown exception type. */ + @FunctionalInterface + public interface Closer { void close(T t) throws E; } + + private final Object monitor = new Object(); + private final Closer closer; + private volatile T wrapped; + private Supplier factory; + + public Memoized(Supplier factory, Closer closer) { + this.factory = requireNonNull(factory); + this.closer = requireNonNull(closer); + } + + public static Memoized of(Supplier factory) { + return new Memoized<>(factory, AutoCloseable::close); + } + + @Override + public T get() { + if (wrapped == null) synchronized (monitor) { + if (factory != null) wrapped = factory.get(); + factory = null; + if (wrapped == null) throw new IllegalStateException("already closed"); + } + return wrapped; + } + + @Override + public void close() throws E { + synchronized (monitor) { + T maybe = wrapped; + wrapped = null; + factory = null; + if (maybe != null) closer.close(maybe); + } + } + +} \ No newline at end of file diff --git a/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java new file mode 100644 index 00000000000..5f6268ce7da --- /dev/null +++ b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java @@ -0,0 +1,89 @@ +package com.yahoo.yolean.concurrent; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; + +/** + * @author jonmv + */ +public class MemoizedTest { + + final Phaser phaser = new Phaser(); + final int threads = 128; + + @Test + public void test() throws ExecutionException, InterruptedException { + var lazy = new Memoized<>(new OnceSupplier(), OnceCloseable::close); + phaser.register(); // test thread + phaser.register(); // whoever calls the factory + + Phaser latch = new Phaser(threads + 1); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List> futures = new ArrayList<>(); + for (int i = 0; i < 128; i++) { + futures.add(executor.submit(() -> { + latch.arriveAndAwaitAdvance(); + lazy.get().rendezvous(); + while (true) lazy.get(); + })); + } + + // All threads waiting for latch, will race to factory + latch.arriveAndAwaitAdvance(); + + // One thread waiting in factory, the others are blocked, will go to rendezvous + phaser.arriveAndAwaitAdvance(); + + // All threads waiting in rendezvous, will repeatedly get until failure + phaser.arriveAndAwaitAdvance(); + + // Unsynchronized close should be detected by all threads + lazy.close(); + + // Close should carry through only once + lazy.close(); + + assertEquals("already closed", + assertThrows(IllegalStateException.class, lazy::get).getMessage()); + + for (Future future : futures) + assertEquals("java.lang.IllegalStateException: already closed", + assertThrows(ExecutionException.class, future::get).getMessage()); + + executor.shutdown(); + } + + class OnceSupplier implements Supplier { + final AtomicBoolean initialized = new AtomicBoolean(); + @Override public OnceCloseable get() { + phaser.arriveAndAwaitAdvance(); + if ( ! initialized.compareAndSet(false, true)) fail("initialized more than once"); + phaser.bulkRegister(threads - 1); // register all the threads who didn't get the factory + return new OnceCloseable(); + } + } + + class OnceCloseable implements AutoCloseable { + final AtomicBoolean closed = new AtomicBoolean(); + @Override public void close() { + if ( ! closed.compareAndSet(false, true)) fail("closed more than once"); + } + void rendezvous() { + phaser.arriveAndAwaitAdvance(); + } + } + +} -- cgit v1.2.3