aboutsummaryrefslogtreecommitdiffstats
path: root/jrt
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 20:59:13 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-25 20:59:13 +0000
commit6c3f5eedc1d288cc1e5e71a2cbd7da02d5415016 (patch)
treec80e8d72c421cd2fa02abb5ce0456dd3fe43405f /jrt
parente8f646592fce73288f40bf854bf274e7dca0f475 (diff)
Factor out generic thread pool to vespajlib.
Diffstat (limited to 'jrt')
-rw-r--r--jrt/src/com/yahoo/jrt/Connector.java88
1 files changed, 13 insertions, 75 deletions
diff --git a/jrt/src/com/yahoo/jrt/Connector.java b/jrt/src/com/yahoo/jrt/Connector.java
index b0eeffda469..10f2b3742f2 100644
--- a/jrt/src/com/yahoo/jrt/Connector.java
+++ b/jrt/src/com/yahoo/jrt/Connector.java
@@ -1,104 +1,41 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.jrt;
-import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.concurrent.CachedThreadPoolWithFallback;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
class Connector implements AutoCloseable {
- //TODO Move to vespajlib as utility as java does not have what we want.
private static final Object globalLock = new Object();
- private static ExecutorService globalPrimaryExecutor = null;
- private static ExecutorService globalFallbackExecutor = null;
+ private static CachedThreadPoolWithFallback globalExecutor = null;
private static long usages = 0;
- private static class ExecutorWithFallback implements Executor {
- private final Executor primary;
- private final Executor secondary;
- ExecutorWithFallback(Executor primary, Executor secondary) {
- this.primary = primary;
- this.secondary = secondary;
- }
-
- @Override
- public void execute(Runnable command) {
- try {
- primary.execute(command);
- } catch (RejectedExecutionException e1) {
- secondary.execute(() -> retryForever(command));
- }
- }
- private void retryForever(Runnable command) {
- while (true) {
- try {
- primary.execute(command);
- return;
- } catch (RejectedExecutionException rejected) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException silenced) { }
- }
- }
- }
- }
-
- private static ExecutorWithFallback acquire() {
+ private static CachedThreadPoolWithFallback acquire() {
synchronized (globalLock) {
- if (globalPrimaryExecutor == null) {
- if (globalFallbackExecutor != null) {
- throw new IllegalStateException("fallback executor must be null !");
- }
- if (usages != 0) {
- throw new IllegalStateException("usages " + usages + " != 0");
- }
- globalPrimaryExecutor = new ThreadPoolExecutor(1, 64, 1L, TimeUnit.SECONDS,
- new SynchronousQueue<>(), ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.primary"));
- globalFallbackExecutor = Executors.newSingleThreadExecutor(ThreadFactoryFactory.getDaemonThreadFactory("jrt.connector.fallback"));
+ if (globalExecutor == null) {
+ globalExecutor = new CachedThreadPoolWithFallback("jrt.connector", 1, 64, 1L, TimeUnit.SECONDS);
}
usages++;
- return new ExecutorWithFallback(globalPrimaryExecutor, globalFallbackExecutor);
+ return globalExecutor;
}
}
- private static void join(ExecutorService executor) {
- while (true) {
- try {
- if (executor.awaitTermination(60, TimeUnit.SECONDS)) {
- return;
- }
- } catch (InterruptedException e) {}
- }
- }
-
- private static void release(ExecutorWithFallback executor) {
+ private static void release(CachedThreadPoolWithFallback executor) {
synchronized (globalLock) {
- if (executor.primary != globalPrimaryExecutor) {
- throw new IllegalStateException("primary executor " + executor.primary + " != " + globalPrimaryExecutor);
- }
- if (executor.secondary != globalFallbackExecutor) {
- throw new IllegalStateException("secondary executor " + executor.secondary + " != " + globalFallbackExecutor);
- }
+ assert executor == globalExecutor;
usages--;
if (usages == 0) {
- globalPrimaryExecutor.shutdown();
- globalFallbackExecutor.shutdown();
- join(globalPrimaryExecutor);
- globalPrimaryExecutor = null;
- join(globalFallbackExecutor);
- globalFallbackExecutor = null;
+ globalExecutor.close();
+ globalExecutor = null;
}
}
}
- private final AtomicReference<ExecutorWithFallback> executor;
+ private final AtomicReference<CachedThreadPoolWithFallback> executor;
Connector() {
executor = new AtomicReference<>(acquire());
@@ -120,8 +57,9 @@ class Connector implements AutoCloseable {
conn.transportThread().addConnection(conn);
}
+ @Override
public void close() {
- ExecutorWithFallback toShutdown = executor.getAndSet(null);
+ CachedThreadPoolWithFallback toShutdown = executor.getAndSet(null);
if (toShutdown != null) {
release(toShutdown);
}