diff options
author | jonmv <venstad@gmail.com> | 2022-05-03 07:30:55 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2022-05-03 07:30:55 +0200 |
commit | 3592e408848787f75e721bf2d6f99f3f010f6610 (patch) | |
tree | 8a41d51e4bbc420bc1dbc65d76e79e3f8ab423d3 /yolean/src | |
parent | e046ae79779261b09f85bcf4c04c906b83075775 (diff) |
Revert "Merge pull request #22394 from vespa-engine/revert-22374-jonmv/remove-last-controller-jersey-client"
This reverts commit e046ae79779261b09f85bcf4c04c906b83075775, reversing
changes made to d2066c0a0c04e2aa2ada12a5c85f5eae9ff65b02.
Diffstat (limited to 'yolean/src')
-rw-r--r-- | yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java | 64 | ||||
-rw-r--r-- | yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java | 101 |
2 files changed, 165 insertions, 0 deletions
diff --git a/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java new file mode 100644 index 00000000000..e8660504a8a --- /dev/null +++ b/yolean/src/main/java/com/yahoo/yolean/concurrent/Memoized.java @@ -0,0 +1,64 @@ +package com.yahoo.yolean.concurrent; + +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** + * Wraps a lazily initialised resource which needs to be shut down. + * The wrapped supplier may not return {@code null}, and should be retryable on failure. + * If it throws, it will be retried if {@link #get} is retried. A supplier that fails to + * clean up partial state on failure may cause a resource leak. + * + * @author jonmv + */ +public class Memoized<T, E extends Exception> implements Supplier<T>, AutoCloseable { + + /** Provides a tighter bound on the thrown exception type. */ + @FunctionalInterface + public interface Closer<T, E extends Exception> { void close(T t) throws E; } + + private final Object monitor = new Object(); + private final Closer<T, E> closer; + private volatile T wrapped; + private Supplier<T> factory; + + public Memoized(Supplier<T> factory, Closer<T, E> closer) { + this.factory = requireNonNull(factory); + this.closer = requireNonNull(closer); + } + + public static <T extends AutoCloseable> Memoized<T, ?> of(Supplier<T> factory) { + return new Memoized<>(factory, AutoCloseable::close); + } + + @Override + public T get() { + // Double-checked locking: try the variable, and if not initialized, try to initialize it. + if (wrapped == null) synchronized (monitor) { + // Ensure the factory is called only once, by clearing it once successfully called. + if (factory != null) wrapped = requireNonNull(factory.get()); + factory = null; + + // If we found the factory, we won the initialization race, and return normally; otherwise + // if wrapped is non-null, we lost the race, wrapped was set by the winner, and we return; otherwise + // we tried to initialise because wrapped was cleared by closing this, and we fail. + if (wrapped == null) throw new IllegalStateException("already closed"); + } + return wrapped; + } + + @Override + public void close() throws E { + // Alter state only when synchronized with calls to get(). + synchronized (monitor) { + // Ensure we only try to close the generated resource once, by clearing it after picking it up here. + T maybe = wrapped; + wrapped = null; + // Clear the factory, to signal this has been closed. + factory = null; + if (maybe != null) closer.close(maybe); + } + } + +}
\ No newline at end of file diff --git a/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java new file mode 100644 index 00000000000..7f2f49c75f2 --- /dev/null +++ b/yolean/src/test/java/com/yahoo/yolean/concurrent/MemoizedTest.java @@ -0,0 +1,101 @@ +package com.yahoo.yolean.concurrent; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; + +/** + * @author jonmv + */ +public class MemoizedTest { + + final Phaser phaser = new Phaser(); + final int threads = 128; + + @Test + public void test() throws ExecutionException, InterruptedException { + var lazy = new Memoized<>(new OnceSupplier(), OnceCloseable::close); + phaser.register(); // test thread + phaser.register(); // whoever calls the factory + + Phaser latch = new Phaser(threads + 1); + ExecutorService executor = Executors.newFixedThreadPool(threads); + List<Future<?>> futures = new ArrayList<>(); + for (int i = 0; i < 128; i++) { + futures.add(executor.submit(() -> { + latch.arriveAndAwaitAdvance(); + lazy.get().rendezvous(); + while (true) lazy.get(); + })); + } + + // All threads waiting for latch, will race to factory + latch.arriveAndAwaitAdvance(); + + // One thread waiting in factory, the others are blocked, will go to rendezvous + phaser.arriveAndAwaitAdvance(); + + // All threads waiting in rendezvous, will repeatedly get until failure + phaser.arriveAndAwaitAdvance(); + + // Unsynchronized close should be detected by all threads + lazy.close(); + + // Close should carry through only once + lazy.close(); + + assertEquals("already closed", + assertThrows(IllegalStateException.class, lazy::get).getMessage()); + + for (Future<?> future : futures) + assertEquals("java.lang.IllegalStateException: already closed", + assertThrows(ExecutionException.class, future::get).getMessage()); + + executor.shutdown(); + } + + @Test + public void closeBeforeFirstGet() throws Exception { + OnceSupplier supplier = new OnceSupplier(); + Memoized<OnceCloseable, ?> lazy = Memoized.of(supplier); + lazy.close(); + assertEquals("already closed", + assertThrows(IllegalStateException.class, lazy::get).getMessage()); + lazy.close(); + assertFalse(supplier.initialized.get()); + } + + class OnceSupplier implements Supplier<OnceCloseable> { + final AtomicBoolean initialized = new AtomicBoolean(); + @Override public OnceCloseable get() { + phaser.arriveAndAwaitAdvance(); + if ( ! initialized.compareAndSet(false, true)) fail("initialized more than once"); + phaser.bulkRegister(threads - 1); // register all the threads who didn't get the factory + return new OnceCloseable(); + } + } + + class OnceCloseable implements AutoCloseable { + final AtomicBoolean closed = new AtomicBoolean(); + @Override public void close() { + if ( ! closed.compareAndSet(false, true)) fail("closed more than once"); + } + void rendezvous() { + phaser.arriveAndAwaitAdvance(); + } + } + +} |