aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 15:18:21 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 15:18:21 +0000
commit62ea092d9fba83670a6b9923ec776d910c331d37 (patch)
tree01142e17acc2d582c0090054389f08b420722f49 /jrt
parentd7d87a8e45e0dace606493dfed2f903a614b76f7 (diff)
Add sanity checks and avoid codeduplicationi after codereview.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java40
1 files changed, 24 insertions, 16 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index 716801b4d0e..b0eeffda469 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference;
class Connector implements AutoCloseable {
+ //TODO Move to vespajlib as utility as java does not have what we want.
private static final Object globalLock = new Object();
private static ExecutorService globalPrimaryExecutor = null;
private static ExecutorService globalFallbackExecutor = null;
@@ -32,10 +33,10 @@ class Connector implements AutoCloseable {
try {
primary.execute(command);
} catch (RejectedExecutionException e1) {
- secondary.execute(() -> retryForEver(command));
+ secondary.execute(() -> retryForever(command));
}
}
- private void retryForEver(Runnable command) {
+ private void retryForever(Runnable command) {
while (true) {
try {
primary.execute(command);
@@ -52,15 +53,31 @@ class Connector implements AutoCloseable {
private static ExecutorWithFallback acquire() {
synchronized (globalLock) {
if (globalPrimaryExecutor == null) {
+ if (globalFallbackExecutor != null) {
+ throw new IllegalStateException("fallback executor must be null !");
+ }
+ if (usages != 0) {
+ throw new IllegalStateException("usages " + usages + " != 0");
+ }
globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS,
new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary"));
globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback"));
}
- usages ++;
+ usages++;
return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor);
}
}
+ private static void join(ExecutorService executor) {
+ while (true) {
+ try {
+ if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
+ return;
+ }
+ } catch (InterruptedException e) {}
+ }
+ }
+
private static void release(ExecutorWithFallback executor) {
synchronized (globalLock) {
if (executor.primary != globalPrimaryExecutor) {
@@ -73,19 +90,10 @@ class Connector implements AutoCloseable {
if (usages == 0) {
globalPrimaryExecutor.shutdown();
globalFallbackExecutor.shutdown();
- while (true) {
- try {
- if (globalPrimaryExecutor != null && globalPrimaryExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- globalPrimaryExecutor = null;
- }
- if (globalFallbackExecutor != null && globalFallbackExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
- globalFallbackExecutor = null;
- }
- if (globalFallbackExecutor == null && globalFallbackExecutor == null) {
- return;
- }
- } catch (InterruptedException e) {}
- }
+ join(globalPrimaryExecutor);
+ globalPrimaryExecutor = null;
+ join(globalFallbackExecutor);
+ globalFallbackExecutor = null;
}
}
}