diff options
author | Harald Musum <musum@verizonmedia.com> | 2021-04-26 18:45:24 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2021-04-26 18:45:24 +0200 |
commit | 2895045913dec0162d947dfdab853af2f0e04742 (patch) | |
tree | d35e64e7a11fbc53bc15a72b5a161ba97cf83601 /configserver | |
parent | c742485abd7ebaa5d325a6b59c9bffc393b907e6 (diff) |
Use a single-threaded executor that delegates to another for ZK session work
Use a single-threaded executopr to make sure that events are handled in order,
but delegate to a multi-threaded executor for the real work
Diffstat (limited to 'configserver')
4 files changed, 45 insertions, 11 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 181ed880fd7..91ef14e3394 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -5,7 +5,6 @@ import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.concurrent.DaemonThreadFactory; -import com.yahoo.concurrent.StripedExecutor; import com.yahoo.config.FileReference; import com.yahoo.config.application.api.ApplicationPackage; import com.yahoo.config.application.api.DeployLogger; @@ -128,7 +127,7 @@ public class SessionRepository { SessionPreparer sessionPreparer, ConfigCurator configCurator, Metrics metrics, - StripedExecutor<TenantName> zkWatcherExecutor, + ExecutorService zkWatcherExecutor, PermanentApplicationPackage permanentApplicationPackage, FlagSource flagSource, ExecutorService zkCacheExecutor, @@ -148,7 +147,7 @@ public class SessionRepository { this.clock = clock; this.curator = configCurator.curator(); this.sessionLifetime = Duration.ofSeconds(configserverConfig.sessionLifetime()); - this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command); + this.zkWatcherExecutor = zkWatcherExecutor; this.permanentApplicationPackage = permanentApplicationPackage; this.flagSource = flagSource; this.tenantFileSystemDirs = new TenantFileSystemDirs(configServerDB, tenantName); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 52d1484897c..d0780a5b2d8 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -58,8 +58,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -103,7 +105,7 @@ public class TenantRepository { private final Metrics metrics; private final MetricUpdater metricUpdater; private final ExecutorService zkCacheExecutor; - private final StripedExecutor<TenantName> zkSessionWatcherExecutor; + private final ExecutorService zkSessionWatcherExecutor; private final StripedExecutor<TenantName> zkApplicationWatcherExecutor; private final FileDistributionFactory fileDistributionFactory; private final FlagSource flagSource; @@ -142,7 +144,7 @@ public class TenantRepository { configCurator, metrics, new StripedExecutor<>(), - new StripedExecutor<>(), + new ZkWatcherExecutorService(), new FileDistributionFactory(configserverConfig), flagSource, Executors.newFixedThreadPool(1, ThreadFactoryFactory.getThreadFactory(TenantRepository.class.getName())), @@ -162,7 +164,7 @@ public class TenantRepository { ConfigCurator configCurator, Metrics metrics, StripedExecutor<TenantName> zkApplicationWatcherExecutor , - StripedExecutor<TenantName> zkSessionWatcherExecutor, + ExecutorService zkSessionWatcherExecutor, FileDistributionFactory fileDistributionFactory, FlagSource flagSource, ExecutorService zkCacheExecutor, @@ -542,9 +544,10 @@ public class TenantRepository { zkCacheExecutor.shutdown(); checkForRemovedApplicationsService.shutdown(); zkApplicationWatcherExecutor.shutdownAndWait(); - zkSessionWatcherExecutor.shutdownAndWait(); + zkSessionWatcherExecutor.shutdown(); zkCacheExecutor.awaitTermination(50, TimeUnit.SECONDS); checkForRemovedApplicationsService.awaitTermination(50, TimeUnit.SECONDS); + zkSessionWatcherExecutor.awaitTermination(50, TimeUnit.SECONDS); } catch (InterruptedException e) { log.log(Level.WARNING, "Interrupted while shutting down.", e); @@ -612,4 +615,36 @@ public class TenantRepository { public Curator getCurator() { return curator; } + /** + * Single-threaded executor (to make sure that getting data for a ZooKeeper watcher event is done serially) that + * delegates to another executor (that uses a cached thread pool) + */ + private static class ZkWatcherExecutorService extends ThreadPoolExecutor { + + private final ExecutorService executorService = + Executors.newCachedThreadPool(new DaemonThreadFactory("zk-session-watcher-")); + + public ZkWatcherExecutorService() { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + } + + @Override + public void execute(Runnable command) { + super.execute(() -> executorService.execute(command)); + } + + @Override + public void shutdown() { + super.shutdown(); + executorService.shutdown(); + } + + @Override + public List<Runnable> shutdownNow() { + super.shutdownNow(); + return executorService.shutdownNow(); + } + + } + } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java index 6f7e0541cc7..36ce3eaac32 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TenantRepositoryTest.java @@ -172,8 +172,8 @@ public class TenantRepositoryTest { try { tenantRepository.addTenant(newTenant); // Poll for the watcher to pick up the tenant from zk, and add it - int tries=0; - while(true) { + int tries = 0; + while (true) { if (tries > 5000) fail("Didn't react on watch"); if (tenantRepository.getAllTenantNames().containsAll(expectedTenants)) { break; @@ -212,7 +212,7 @@ public class TenantRepositoryTest { ConfigCurator.create(new MockCurator()), Metrics.createTestMetrics(), new StripedExecutor<>(new InThreadExecutorService()), - new StripedExecutor<>(new InThreadExecutorService()), + new InThreadExecutorService(), new FileDistributionFactory(new ConfigserverConfig.Builder().build()), new InMemoryFlagSource(), new InThreadExecutorService(), diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java index 687d58fd23b..40f046b8b05 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/tenant/TestTenantRepository.java @@ -50,7 +50,7 @@ public class TestTenantRepository extends TenantRepository { ConfigCurator.create(curator), metrics, new StripedExecutor<>(new InThreadExecutorService()), - new StripedExecutor<>(new InThreadExecutorService()), + new InThreadExecutorService(), fileDistributionFactory, flagSource, new InThreadExecutorService(), |