diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2020-11-03 14:33:05 +0100 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2020-11-03 14:33:05 +0100 |
commit | ad97dd791ae12eb9d63eebcb6daa7bcfa1ad0fe4 (patch) | |
tree | 22a863dec4c774bd826cccbb9db4f54343d33762 /configserver | |
parent | 157964dd3cf677e6abb3d9ba4d58bb6e4df2d94d (diff) |
Move curator-stuff from TenantApplications to ApplicationCuratorDatabase
Diffstat (limited to 'configserver')
3 files changed, 114 insertions, 47 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java index 94eb6eeea57..4bc3066ee18 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabase.java @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.application; +import com.yahoo.concurrent.StripedExecutor; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.TenantName; import com.yahoo.path.Path; @@ -8,29 +9,113 @@ import com.yahoo.slime.Cursor; import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; import com.yahoo.slime.SlimeUtils; +import com.yahoo.text.Utf8; +import com.yahoo.transaction.Transaction; import com.yahoo.vespa.config.server.application.ApplicationReindexing.Cluster; import com.yahoo.vespa.config.server.application.ApplicationReindexing.Status; import com.yahoo.vespa.config.server.tenant.TenantRepository; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.curator.Lock; +import com.yahoo.vespa.curator.transaction.CuratorOperations; +import com.yahoo.vespa.curator.transaction.CuratorTransaction; import com.yahoo.yolean.Exceptions; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import java.time.Duration; import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import static java.util.stream.Collectors.toUnmodifiableMap; /** - * Stores data and holds locks for application, backed by a {@link Curator}. + * Stores data and holds locks for the applications of a tenant, backed by a {@link Curator}. + * + * Each application is stored under /config/v2/tenants/<tenant>/applications/<application>, + * the root contains the currently active session, if any. Children of this node may hold more data. + * Locks for synchronising writes to these paths, and changes to the config of this application, are found + * under /config/v2/tenants/<tenant>/locks/<application>. * * @author jonmv */ public class ApplicationCuratorDatabase { + final TenantName tenant; + final Path applicationsPath; + final Path locksPath; + private final Curator curator; - public ApplicationCuratorDatabase(Curator curator) { + public ApplicationCuratorDatabase(TenantName tenant, Curator curator) { + this.tenant = tenant; + this.applicationsPath = TenantRepository.getApplicationsPath(tenant); + this.locksPath = TenantRepository.getLocksPath(tenant); this.curator = curator; } + /** Returns the lock for changing the session status of the given application. */ + public Lock lock(ApplicationId id) { + return curator.lock(lockPath(id), Duration.ofMinutes(1)); // These locks shouldn't be held for very long. + } + + public boolean exists(ApplicationId id) { + return curator.exists(applicationPath(id)); + } + + /** + * Creates a node for the given application, marking its existence. + */ + public void createApplication(ApplicationId id) { + if ( ! id.tenant().equals(tenant)) + throw new IllegalArgumentException("Cannot write application id '" + id + "' for tenant '" + tenant + "'"); + try (Lock lock = lock(id)) { + curator.create(applicationPath(id)); + } + } + + /** + * Returns a transaction which writes the given session id as the currently active for the given application. + * + * @param applicationId An {@link ApplicationId} that represents an active application. + * @param sessionId Id of the session containing the application package for this id. + */ + public Transaction createPutTransaction(ApplicationId applicationId, long sessionId) { + return new CuratorTransaction(curator).add(CuratorOperations.setData(applicationPath(applicationId).getAbsolute(), Utf8.toAsciiBytes(sessionId))); + } + + /** + * Returns a transaction which deletes this application. + */ + public CuratorTransaction createDeleteTransaction(ApplicationId applicationId) { + return CuratorTransaction.from(CuratorOperations.deleteAll(applicationPath(applicationId).getAbsolute(), curator), curator); + } + + /** + * Returns the active session id for the given application. + * Returns Optional.empty if application not found or no active session exists. + */ + public Optional<Long> activeSessionOf(ApplicationId id) { + Optional<byte[]> data = curator.getData(applicationPath(id)); + return (data.isEmpty() || data.get().length == 0) + ? Optional.empty() + : data.map(bytes -> Long.parseLong(Utf8.toString(bytes))); + } + + /** + * List the active applications of a tenant in this config server. + * + * @return a list of {@link ApplicationId}s that are active. + */ + public List<ApplicationId> activeApplications() { + return curator.getChildren(applicationsPath).stream() + .sorted() + .map(ApplicationId::fromSerializedForm) + .filter(id -> activeSessionOf(id).isPresent()) + .collect(Collectors.toUnmodifiableList()); + } + public ApplicationReindexing readReindexingStatus(ApplicationId id) { return curator.getData(reindexingDataPath(id)) .map(data -> ReindexingStatusSerializer.fromBytes(data)) @@ -41,15 +126,22 @@ public class ApplicationCuratorDatabase { curator.set(reindexingDataPath(id), ReindexingStatusSerializer.toBytes(status)); } - private static Path applicationsRoot(TenantName tenant) { - return TenantRepository.getApplicationsPath(tenant); + + /** Sets up a listenable cache with the given listener, over the applications path of this tenant. */ + public Curator.DirectoryCache createApplicationsPathCache(ExecutorService zkCacheExecutor) { + return curator.createDirectoryCache(applicationsPath.getAbsolute(), false, false, zkCacheExecutor); + } + + + private Path lockPath(ApplicationId id) { + return locksPath.append(id.serializedForm()); } - private static Path applicationPath(ApplicationId id) { - return applicationsRoot(id.tenant()).append(id.serializedForm()); + private Path applicationPath(ApplicationId id) { + return applicationsPath.append(id.serializedForm()); } - private static Path reindexingDataPath(ApplicationId id) { + private Path reindexingDataPath(ApplicationId id) { return applicationPath(id).append("reindexing"); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index a01ce2e2cc3..7c54fd39a74 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -49,11 +49,7 @@ import java.util.stream.Collectors; import static java.util.stream.Collectors.toSet; /** - * The applications of a tenant, backed by ZooKeeper. - * - * Each application is stored under /config/v2/tenants/<tenant>/applications/<application>, - * the root contains the currently active session, if any. Locks for synchronising writes to these paths, and changes - * to the config of this application, are found under /config/v2/tenants/<tenant>/locks/<application>. + * The applications of a tenant. * * @author Ulf Lilleengen * @author jonmv @@ -62,9 +58,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica private static final Logger log = Logger.getLogger(TenantApplications.class.getName()); - private final Curator curator; - private final Path applicationsPath; - private final Path locksPath; + private final ApplicationCuratorDatabase database; private final Curator.DirectoryCache directoryCache; private final Executor zkWatcherExecutor; private final Metrics metrics; @@ -81,12 +75,10 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica ExecutorService zkCacheExecutor, Metrics metrics, ReloadListener reloadListener, ConfigserverConfig configserverConfig, HostRegistry<ApplicationId> hostRegistry, TenantFileSystemDirs tenantFileSystemDirs) { - this.curator = curator; - this.applicationsPath = TenantRepository.getApplicationsPath(tenant); - this.locksPath = TenantRepository.getLocksPath(tenant); + this.database = new ApplicationCuratorDatabase(tenant, curator); this.tenant = tenant; this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenant, command); - this.directoryCache = curator.createDirectoryCache(applicationsPath.getAbsolute(), false, false, zkCacheExecutor); + this.directoryCache = database.createApplicationsPathCache(zkCacheExecutor); this.directoryCache.addListener(this::childEvent); this.directoryCache.start(); this.metrics = metrics; @@ -110,21 +102,20 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica new TenantFileSystemDirs(componentRegistry.getConfigServerDB(), tenantName)); } + /** The curator backed ZK storage of this. */ + public ApplicationCuratorDatabase database() { return database; } + /** * List the active applications of a tenant in this config server. * * @return a list of {@link ApplicationId}s that are active. */ public List<ApplicationId> activeApplications() { - return curator.getChildren(applicationsPath).stream() - .sorted() - .map(ApplicationId::fromSerializedForm) - .filter(id -> activeSessionOf(id).isPresent()) - .collect(Collectors.toUnmodifiableList()); + return database().activeApplications(); } public boolean exists(ApplicationId id) { - return curator.exists(applicationPath(id)); + return database().exists(id); } /** @@ -132,10 +123,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica * Returns Optional.empty if application not found or no active session exists. */ public Optional<Long> activeSessionOf(ApplicationId id) { - Optional<byte[]> data = curator.getData(applicationPath(id)); - return (data.isEmpty() || data.get().length == 0) - ? Optional.empty() - : data.map(bytes -> Long.parseLong(Utf8.toString(bytes))); + return database().activeSessionOf(id); } public boolean sessionExistsInFileSystem(long sessionId) { @@ -149,18 +137,14 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica * @param sessionId Id of the session containing the application package for this id. */ public Transaction createPutTransaction(ApplicationId applicationId, long sessionId) { - return new CuratorTransaction(curator).add(CuratorOperations.setData(applicationPath(applicationId).getAbsolute(), Utf8.toAsciiBytes(sessionId))); + return database().createPutTransaction(applicationId, sessionId); } /** * Creates a node for the given application, marking its existence. */ public void createApplication(ApplicationId id) { - if (! id.tenant().equals(tenant)) - throw new IllegalArgumentException("Cannot write application id '" + id + "' for tenant '" + tenant + "'"); - try (Lock lock = lock(id)) { - curator.create(applicationPath(id)); - } + database().createApplication(id); } /** @@ -179,7 +163,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica * Returns a transaction which deletes this application. */ public CuratorTransaction createDeleteTransaction(ApplicationId applicationId) { - return CuratorTransaction.from(CuratorOperations.deleteAll(applicationPath(applicationId).getAbsolute(), curator), curator); + return database().createDeleteTransaction(applicationId); } /** @@ -198,7 +182,7 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica /** Returns the lock for changing the session status of the given application. */ public Lock lock(ApplicationId id) { - return curator.lock(lockPath(id), Duration.ofMinutes(1)); // These locks shouldn't be held for very long. + return database().lock(id); } private void childEvent(CuratorFramework ignored, PathChildrenCacheEvent event) { @@ -232,15 +216,6 @@ public class TenantApplications implements RequestHandler, HostValidator<Applica log.log(Level.FINE, TenantRepository.logPre(applicationId) + "Application added: " + applicationId); } - // TODO jonmv: Move curator stuff to ApplicationCuratorDatabase - private Path applicationPath(ApplicationId id) { - return applicationsPath.append(id.serializedForm()); - } - - private Path lockPath(ApplicationId id) { - return locksPath.append(id.serializedForm()); - } - /** * Gets a config for the given app, or null if not found */ diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java index 96de2a8e4a4..89ee1dcccce 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/application/ApplicationCuratorDatabaseTest.java @@ -17,7 +17,7 @@ public class ApplicationCuratorDatabaseTest { @Test public void testReindexingStatusSerialization() { ApplicationId id = ApplicationId.defaultId(); - ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(new MockCurator()); + ApplicationCuratorDatabase db = new ApplicationCuratorDatabase(id.tenant(), new MockCurator()); assertEquals(ApplicationReindexing.empty(), db.readReindexingStatus(id)); |