summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2021-04-26 18:45:24 +0200
committerHarald Musum <musum@verizonmedia.com>2021-04-26 18:45:24 +0200
commit2895045913dec0162d947dfdab853af2f0e04742 (patch)
treed35e64e7a11fbc53bc15a72b5a161ba97cf83601 /configserver
parentc742485abd7ebaa5d325a6b59c9bffc393b907e6 (diff)
Use a single-threaded executor that delegates to another for ZK session work
Use a single-threaded executopr to make sure that events are handled in order, but delegate to a multi-threaded executor for the real work
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java5
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java43
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java6
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java2
4 files changed, 45 insertions, 11 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 181ed880fd7..91ef14e3394 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -5,7 +5,6 @@ import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.concurrent.DaemonThreadFactory;
-import com.yahoo.concurrent.StripedExecutor;
import com.yahoo.config.FileReference;
import com.yahoo.config.application.api.ApplicationPackage;
import com.yahoo.config.application.api.DeployLogger;
@@ -128,7 +127,7 @@ public class SessionRepository {
SessionPreparer sessionPreparer,
ConfigCurator configCurator,
Metrics metrics,
- StripedExecutor<TenantName> zkWatcherExecutor,
+ ExecutorService zkWatcherExecutor,
PermanentApplicationPackage permanentApplicationPackage,
FlagSource flagSource,
ExecutorService zkCacheExecutor,
@@ -148,7 +147,7 @@ public class SessionRepository {
this.clock = clock;
this.curator = configCurator.curator();
this.sessionLifetime = Duration.ofSeconds(configserverConfig.sessionLifetime());
- this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command);
+ this.zkWatcherExecutor = zkWatcherExecutor;
this.permanentApplicationPackage = permanentApplicationPackage;
this.flagSource = flagSource;
this.tenantFileSystemDirs = new TenantFileSystemDirs(configServerDB, tenantName);
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
index 52d1484897c..d0780a5b2d8 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java
@@ -58,8 +58,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -103,7 +105,7 @@ public class TenantRepository {
private final Metrics metrics;
private final MetricUpdater metricUpdater;
private final ExecutorService zkCacheExecutor;
- private final StripedExecutor<TenantName> zkSessionWatcherExecutor;
+ private final ExecutorService zkSessionWatcherExecutor;
private final StripedExecutor<TenantName> zkApplicationWatcherExecutor;
private final FileDistributionFactory fileDistributionFactory;
private final FlagSource flagSource;
@@ -142,7 +144,7 @@ public class TenantRepository {
configCurator,
metrics,
new StripedExecutor<>(),
- new StripedExecutor<>(),
+ new ZkWatcherExecutorService(),
new FileDistributionFactory(configserverConfig),
flagSource,
Executors.newFixedThreadPool(1, ThreadFactoryFactory.getThreadFactory(TenantRepository.class.getName())),
@@ -162,7 +164,7 @@ public class TenantRepository {
ConfigCurator configCurator,
Metrics metrics,
StripedExecutor<TenantName> zkApplicationWatcherExecutor ,
- StripedExecutor<TenantName> zkSessionWatcherExecutor,
+ ExecutorService zkSessionWatcherExecutor,
FileDistributionFactory fileDistributionFactory,
FlagSource flagSource,
ExecutorService zkCacheExecutor,
@@ -542,9 +544,10 @@ public class TenantRepository {
zkCacheExecutor.shutdown();
checkForRemovedApplicationsService.shutdown();
zkApplicationWatcherExecutor.shutdownAndWait();
- zkSessionWatcherExecutor.shutdownAndWait();
+ zkSessionWatcherExecutor.shutdown();
zkCacheExecutor.awaitTermination(50, TimeUnit.SECONDS);
checkForRemovedApplicationsService.awaitTermination(50, TimeUnit.SECONDS);
+ zkSessionWatcherExecutor.awaitTermination(50, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
log.log(Level.WARNING, "Interrupted while shutting down.", e);
@@ -612,4 +615,36 @@ public class TenantRepository {
public Curator getCurator() { return curator; }
+ /**
+ * Single-threaded executor (to make sure that getting data for a ZooKeeper watcher event is done serially) that
+ * delegates to another executor (that uses a cached thread pool)
+ */
+ private static class ZkWatcherExecutorService extends ThreadPoolExecutor {
+
+ private final ExecutorService executorService =
+ Executors.newCachedThreadPool(new DaemonThreadFactory("zk-session-watcher-"));
+
+ public ZkWatcherExecutorService() {
+ super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ super.execute(() -> executorService.execute(command));
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ executorService.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ super.shutdownNow();
+ return executorService.shutdownNow();
+ }
+
+ }
+
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
index 6f7e0541cc7..36ce3eaac32 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java
@@ -172,8 +172,8 @@ public class TenantRepositoryTest {
try {
tenantRepository.addTenant(newTenant);
// Poll for the watcher to pick up the tenant from zk, and add it
- int tries=0;
- while(true) {
+ int tries = 0;
+ while (true) {
if (tries > 5000) fail("Didn't react on watch");
if (tenantRepository.getAllTenantNames().containsAll(expectedTenants)) {
break;
@@ -212,7 +212,7 @@ public class TenantRepositoryTest {
ConfigCurator.create(new MockCurator()),
Metrics.createTestMetrics(),
new StripedExecutor<>(new InThreadExecutorService()),
- new StripedExecutor<>(new InThreadExecutorService()),
+ new InThreadExecutorService(),
new FileDistributionFactory(new ConfigserverConfig.Builder().build()),
new InMemoryFlagSource(),
new InThreadExecutorService(),
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
index 687d58fd23b..40f046b8b05 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java
@@ -50,7 +50,7 @@ public class TestTenantRepository extends TenantRepository {
ConfigCurator.create(curator),
metrics,
new StripedExecutor<>(new InThreadExecutorService()),
- new StripedExecutor<>(new InThreadExecutorService()),
+ new InThreadExecutorService(),
fileDistributionFactory,
flagSource,
new InThreadExecutorService(),