From 62ea092d9fba83670a6b9923ec776d910c331d37 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 25 Feb 2020 15:18:21 +0000 Subject: Add sanity checks and avoid codeduplicationi after codereview. --- jrt/src/com/yahoo/jrt/Connector.java | 40 +++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 16 deletions(-) (limited to 'jrt/src') diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java index 716801b4d0e..b0eeffda469 100644 --- a/jrt/src/com/yahoo/jrt/Connector.java +++ b/jrt/src/com/yahoo/jrt/Connector.java @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference; class Connector implements AutoCloseable { + //TODO Move to vespajlib as utility as java does not have what we want. private static final Object globalLock = new Object(); private static ExecutorService globalPrimaryExecutor = null; private static ExecutorService globalFallbackExecutor = null; @@ -32,10 +33,10 @@ class Connector implements AutoCloseable { try { primary.execute(command); } catch (RejectedExecutionException e1) { - secondary.execute(() -> retryForEver(command)); + secondary.execute(() -> retryForever(command)); } } - private void retryForEver(Runnable command) { + private void retryForever(Runnable command) { while (true) { try { primary.execute(command); @@ -52,15 +53,31 @@ class Connector implements AutoCloseable { private static ExecutorWithFallback acquire() { synchronized (globalLock) { if (globalPrimaryExecutor == null) { + if (globalFallbackExecutor != null) { + throw new IllegalStateException("fallback executor must be null !"); + } + if (usages != 0) { + throw new IllegalStateException("usages " + usages + " != 0"); + } globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary")); globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback")); } - usages ++; + usages++; return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor); } } + private static void join(ExecutorService executor) { + while (true) { + try { + if (executor.awaitTermination(60, TimeUnit.SECONDS)) { + return; + } + } catch (InterruptedException e) {} + } + } + private static void release(ExecutorWithFallback executor) { synchronized (globalLock) { if (executor.primary != globalPrimaryExecutor) { @@ -73,19 +90,10 @@ class Connector implements AutoCloseable { if (usages == 0) { globalPrimaryExecutor.shutdown(); globalFallbackExecutor.shutdown(); - while (true) { - try { - if (globalPrimaryExecutor != null && globalPrimaryExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - globalPrimaryExecutor = null; - } - if (globalFallbackExecutor != null && globalFallbackExecutor.awaitTermination(60, TimeUnit.SECONDS)) { - globalFallbackExecutor = null; - } - if (globalFallbackExecutor == null && globalFallbackExecutor == null) { - return; - } - } catch (InterruptedException e) {} - } + join(globalPrimaryExecutor); + globalPrimaryExecutor = null; + join(globalFallbackExecutor); + globalFallbackExecutor = null; } } } -- cgit v1.2.3