diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-06-10 09:56:29 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-06-10 09:56:29 +0200 |
commit | 8db3e940c336f4e37043e37e227835d6491198a8 (patch) | |
tree | 44286799eb2dc1fbe149352de6574957180e91a9 /configserver | |
parent | 049564145c6cd6ff9746221312b611da7eeb421b (diff) |
Move RemoteSessionRepo into SessionRepository
Diffstat (limited to 'configserver')
10 files changed, 364 insertions, 454 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java index c7d1b4b1cdd..af5a561aa6e 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/ApplicationRepository.java @@ -52,7 +52,6 @@ import com.yahoo.vespa.config.server.session.LocalSession; import com.yahoo.vespa.config.server.session.SessionRepository; import com.yahoo.vespa.config.server.session.PrepareParams; import com.yahoo.vespa.config.server.session.RemoteSession; -import com.yahoo.vespa.config.server.session.RemoteSessionRepo; import com.yahoo.vespa.config.server.session.Session; import com.yahoo.vespa.config.server.session.SessionFactory; import com.yahoo.vespa.config.server.session.SilentDeployLogger; @@ -489,7 +488,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye Tenant tenant = tenantRepository.getTenant(applicationId.tenant()); if (tenant == null) throw new NotFoundException("Tenant '" + applicationId.tenant() + "' not found"); long sessionId = getSessionIdForApplication(tenant, applicationId); - RemoteSession session = tenant.getRemoteSessionRepo().getSession(sessionId); + RemoteSession session = tenant.getSessionRepo().getRemoteSession(sessionId); if (session == null) throw new NotFoundException("Remote session " + sessionId + " not found"); return session.ensureApplicationLoaded().getForVersionOrLatest(version, clock.instant()); } catch (NotFoundException e) { @@ -526,10 +525,10 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye } private boolean localSessionHasBeenDeleted(ApplicationId applicationId, long sessionId, Duration waitTime) { - RemoteSessionRepo remoteSessionRepo = tenantRepository.getTenant(applicationId.tenant()).getRemoteSessionRepo(); + SessionRepository sessionRepository = tenantRepository.getTenant(applicationId.tenant()).getSessionRepo(); Instant end = Instant.now().plus(waitTime); do { - if (remoteSessionRepo.getSession(sessionId) == null) return true; + if (sessionRepository.getRemoteSession(sessionId) == null) return true; try { Thread.sleep(10); } catch (InterruptedException e) { /* ignored */} } while (Instant.now().isBefore(end)); @@ -687,7 +686,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { return tenantRepository.getAllTenants() .stream() - .map(tenant -> tenant.getRemoteSessionRepo().deleteExpiredSessions(clock, expiryTime)) + .map(tenant -> tenant.getSessionRepo().deleteExpiredRemoteSessions(clock, expiryTime)) .mapToInt(i -> i) .sum(); } @@ -754,7 +753,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye } private RemoteSession getRemoteSession(Tenant tenant, long sessionId) { - RemoteSession session = tenant.getRemoteSessionRepo().getSession(sessionId); + RemoteSession session = tenant.getSessionRepo().getRemoteSession(sessionId); if (session == null) throw new NotFoundException("Session " + sessionId + " was not found"); return session; @@ -805,7 +804,7 @@ public class ApplicationRepository implements com.yahoo.config.provision.Deploye private RemoteSession getActiveSession(Tenant tenant, ApplicationId applicationId) { TenantApplications applicationRepo = tenant.getApplicationRepo(); if (applicationRepo.activeApplications().contains(applicationId)) { - return tenant.getRemoteSessionRepo().getSession(applicationRepo.requireActiveSessionOf(applicationId)); + return tenant.getSessionRepo().getRemoteSession(applicationRepo.requireActiveSessionOf(applicationId)); } return null; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java index a4dfec708d6..d4fe35c14b1 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/TenantApplications.java @@ -213,7 +213,7 @@ public class TenantApplications implements RequestHandler, ReloadHandler, HostVa break; } // We may have lost events and may need to remove applications. - // New applications are added when session is added, not here. See RemoteSessionRepo. + // New applications are added when session is added, not here. See SessionRepository. removeUnusedApplications(); }); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java deleted file mode 100644 index 0e538b05931..00000000000 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSessionRepo.java +++ /dev/null @@ -1,241 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.session; - -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; -import com.yahoo.concurrent.StripedExecutor; -import com.yahoo.config.provision.ApplicationId; -import com.yahoo.config.provision.TenantName; - -import java.time.Clock; -import java.util.logging.Level; -import com.yahoo.path.Path; -import com.yahoo.vespa.config.server.GlobalComponentRegistry; -import com.yahoo.vespa.config.server.ReloadHandler; -import com.yahoo.vespa.config.server.application.TenantApplications; -import com.yahoo.vespa.config.server.monitoring.MetricUpdater; -import com.yahoo.vespa.config.server.monitoring.Metrics; -import com.yahoo.vespa.config.server.tenant.TenantRepository; -import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; -import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.flags.BooleanFlag; -import com.yahoo.vespa.flags.FlagSource; -import com.yahoo.vespa.flags.Flags; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * Session repository for RemoteSessions. There is one such repo per tenant. - * Will watch/prepare sessions (applications) based on watched nodes in ZooKeeper. The zookeeper state watched in - * this class is shared between all config servers, so it should not modify any global state, because the operation - * will be performed on all servers. The repo can be regarded as read only from the POV of the configserver. - * - * @author Vegard Havdal - * @author Ulf Lilleengen - */ -public class RemoteSessionRepo { - - private static final Logger log = Logger.getLogger(RemoteSessionRepo.class.getName()); - - private final Curator curator; - private final Path sessionsPath; - private final SessionFactory sessionFactory; - private final Map<Long, RemoteSessionStateWatcher> sessionStateWatchers = new HashMap<>(); - private final ReloadHandler reloadHandler; - private final TenantName tenantName; - private final MetricUpdater metrics; - private final BooleanFlag distributeApplicationPackage; - private final Curator.DirectoryCache directoryCache; - private final TenantApplications applicationRepo; - private final Executor zkWatcherExecutor; - private final SessionCache<RemoteSession> sessionCache; - - public RemoteSessionRepo(GlobalComponentRegistry componentRegistry, - SessionFactory sessionFactory, - ReloadHandler reloadHandler, - TenantName tenantName, - TenantApplications applicationRepo, - FlagSource flagSource) { - this.sessionCache = new SessionCache<>(); - this.curator = componentRegistry.getCurator(); - this.sessionsPath = TenantRepository.getSessionsPath(tenantName); - this.applicationRepo = applicationRepo; - this.sessionFactory = sessionFactory; - this.reloadHandler = reloadHandler; - this.tenantName = tenantName; - this.metrics = componentRegistry.getMetrics().getOrCreateMetricUpdater(Metrics.createDimensions(tenantName)); - this.distributeApplicationPackage = Flags.CONFIGSERVER_DISTRIBUTE_APPLICATION_PACKAGE.bindTo(flagSource); - StripedExecutor<TenantName> zkWatcherExecutor = componentRegistry.getZkWatcherExecutor(); - this.zkWatcherExecutor = command -> zkWatcherExecutor.execute(tenantName, command); - initializeSessions(); - this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, componentRegistry.getZkCacheExecutor()); - this.directoryCache.addListener(this::childEvent); - this.directoryCache.start(); - } - - public RemoteSession getSession(long sessionId) { - return sessionCache.getSession(sessionId); - } - - public List<Long> getSessions() { - return getSessionList(curator.getChildren(sessionsPath)); - } - - public void addSession(RemoteSession session) { - sessionCache.addSession(session); - metrics.incAddedSessions(); - } - - public int deleteExpiredSessions(Clock clock, Duration expiryTime) { - int deleted = 0; - for (long sessionId : getSessions()) { - RemoteSession session = sessionCache.getSession(sessionId); - if (session == null) continue; // Internal sessions not in synch with zk, continue - if (session.getStatus() == Session.Status.ACTIVATE) continue; - if (sessionHasExpired(session.getCreateTime(), expiryTime, clock)) { - log.log(Level.INFO, "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it"); - session.delete(); - deleted++; - } - } - return deleted; - } - - private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) { - return (created.plus(expiryTime).isBefore(clock.instant())); - } - - private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) { - return getSessionList(children.stream() - .map(child -> Path.fromString(child.getPath()).getName()) - .collect(Collectors.toList())); - } - - private List<Long> getSessionList(List<String> children) { - return children.stream().map(Long::parseLong).collect(Collectors.toList()); - } - - private void initializeSessions() throws NumberFormatException { - getSessions().forEach(this::sessionAdded); - } - - private synchronized void sessionsChanged() throws NumberFormatException { - List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); - checkForRemovedSessions(sessions); - checkForAddedSessions(sessions); - } - - private void checkForRemovedSessions(List<Long> sessions) { - for (RemoteSession session : sessionCache.getSessions()) - if ( ! sessions.contains(session.getSessionId())) - sessionRemoved(session.getSessionId()); - } - - private void checkForAddedSessions(List<Long> sessions) { - for (Long sessionId : sessions) - if (sessionCache.getSession(sessionId) == null) - sessionAdded(sessionId); - } - - /** - * A session for which we don't have a watcher, i.e. hitherto unknown to us. - * - * @param sessionId session id for the new session - */ - private void sessionAdded(long sessionId) { - log.log(Level.FINE, () -> "Adding session to RemoteSessionRepo: " + sessionId); - RemoteSession session = sessionFactory.createRemoteSession(sessionId); - Path sessionPath = sessionsPath.append(String.valueOf(sessionId)); - Curator.FileCache fileCache = curator.createFileCache(sessionPath.append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false); - fileCache.addListener(this::nodeChanged); - loadSessionIfActive(session); - addSession(session); - sessionStateWatchers.put(sessionId, new RemoteSessionStateWatcher(fileCache, reloadHandler, session, metrics, zkWatcherExecutor)); - if (distributeApplicationPackage.value()) - sessionFactory.createLocalSessionUsingDistributedApplicationPackage(sessionId); - } - - private void sessionRemoved(long sessionId) { - RemoteSessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); - if (watcher != null) watcher.close(); - sessionCache.removeSession(sessionId); - metrics.incRemovedSessions(); - } - - private void loadSessionIfActive(RemoteSession session) { - for (ApplicationId applicationId : applicationRepo.activeApplications()) { - if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { - log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); - reloadHandler.reloadConfig(session.ensureApplicationLoaded()); - log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); - return; - } - } - } - - public synchronized void close() { - try { - if (directoryCache != null) { - directoryCache.close(); - } - } catch (Exception e) { - log.log(Level.WARNING, "Exception when closing path cache", e); - } finally { - checkForRemovedSessions(new ArrayList<>()); - } - } - - private void nodeChanged() { - zkWatcherExecutor.execute(() -> { - Multiset<Session.Status> sessionMetrics = HashMultiset.create(); - for (RemoteSession session : sessionCache.getSessions()) { - sessionMetrics.add(session.getStatus()); - } - metrics.setNewSessions(sessionMetrics.count(Session.Status.NEW)); - metrics.setPreparedSessions(sessionMetrics.count(Session.Status.PREPARE)); - metrics.setActivatedSessions(sessionMetrics.count(Session.Status.ACTIVATE)); - metrics.setDeactivatedSessions(sessionMetrics.count(Session.Status.DEACTIVATE)); - }); - } - - @SuppressWarnings("unused") - private void childEvent(CuratorFramework ignored, PathChildrenCacheEvent event) { - zkWatcherExecutor.execute(() -> { - log.log(Level.FINE, () -> "Got child event: " + event); - switch (event.getType()) { - case CHILD_ADDED: - sessionsChanged(); - synchronizeOnNew(getSessionListFromDirectoryCache(Collections.singletonList(event.getData()))); - break; - case CHILD_REMOVED: - sessionsChanged(); - break; - case CONNECTION_RECONNECTED: - sessionsChanged(); - break; - } - }); - } - - private void synchronizeOnNew(List<Long> sessionList) { - for (long sessionId : sessionList) { - RemoteSession session = sessionCache.getSession(sessionId); - if (session == null) continue; // session might have been deleted after getting session list - log.log(Level.FINE, () -> session.logPre() + "Confirming upload for session " + sessionId); - session.confirmUpload(); - } - } - -} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 2390c05d84a..cd3698eed78 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -1,15 +1,27 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.session; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.TenantName; import com.yahoo.path.Path; import com.yahoo.transaction.NestedTransaction; import com.yahoo.vespa.config.server.GlobalComponentRegistry; +import com.yahoo.vespa.config.server.ReloadHandler; +import com.yahoo.vespa.config.server.application.TenantApplications; import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs; +import com.yahoo.vespa.config.server.monitoring.MetricUpdater; +import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.tenant.TenantRepository; import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; import com.yahoo.vespa.curator.Curator; +import com.yahoo.vespa.flags.BooleanFlag; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import java.io.File; import java.io.FilenameFilter; @@ -17,12 +29,14 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * @@ -30,47 +44,76 @@ import java.util.logging.Logger; * different session types (RemoteSession and LocalSession). * * @author Ulf Lilleengen + * @author hmusum + * */ public class SessionRepository { private static final Logger log = Logger.getLogger(SessionRepository.class.getName()); private static final FilenameFilter sessionApplicationsFilter = (dir, name) -> name.matches("\\d+"); - private final SessionCache<LocalSession> sessionCache; - private final Map<Long, LocalSessionStateWatcher> sessionStateWatchers = new HashMap<>(); + private final SessionCache<LocalSession> localSessionCache; + + private final SessionCache<RemoteSession> remoteSessionCache = new SessionCache<>(); + private final Map<Long, LocalSessionStateWatcher> localSessionStateWatchers = new HashMap<>(); + private final Map<Long, RemoteSessionStateWatcher> remoteSessionStateWatchers = new HashMap<>(); private final Duration sessionLifetime; private final Clock clock; private final Curator curator; private final Executor zkWatcherExecutor; private final TenantFileSystemDirs tenantFileSystemDirs; + private final BooleanFlag distributeApplicationPackage; + private final ReloadHandler reloadHandler; + private final MetricUpdater metrics; + private final Curator.DirectoryCache directoryCache; + private final TenantApplications applicationRepo; + private final Path sessionsPath; + private final SessionFactory sessionFactory; + private final TenantName tenantName; - public SessionRepository(TenantName tenantName, GlobalComponentRegistry componentRegistry, SessionFactory sessionFactory) { - sessionCache = new SessionCache<>(); + public SessionRepository(TenantName tenantName, GlobalComponentRegistry componentRegistry, + SessionFactory sessionFactory, TenantApplications applicationRepo, + ReloadHandler reloadHandler, FlagSource flagSource) { + this.tenantName = tenantName; + localSessionCache = new SessionCache<>(); this.clock = componentRegistry.getClock(); this.curator = componentRegistry.getCurator(); this.sessionLifetime = Duration.ofSeconds(componentRegistry.getConfigserverConfig().sessionLifetime()); this.zkWatcherExecutor = command -> componentRegistry.getZkWatcherExecutor().execute(tenantName, command); this.tenantFileSystemDirs = new TenantFileSystemDirs(componentRegistry.getConfigServerDB(), tenantName); - loadSessions(sessionFactory); + this.sessionFactory = sessionFactory; + this.applicationRepo = applicationRepo; + loadLocalSessions(sessionFactory); + + this.distributeApplicationPackage = Flags.CONFIGSERVER_DISTRIBUTE_APPLICATION_PACKAGE.bindTo(flagSource); + this.reloadHandler = reloadHandler; + this.sessionsPath = TenantRepository.getSessionsPath(tenantName); + this.metrics = componentRegistry.getMetrics().getOrCreateMetricUpdater(Metrics.createDimensions(tenantName)); + initializeRemoteSessions(); + this.directoryCache = curator.createDirectoryCache(sessionsPath.getAbsolute(), false, false, componentRegistry.getZkCacheExecutor()); + this.directoryCache.addListener(this::childEvent); + this.directoryCache.start(); } + // ---------------- Local sessions ---------------------------------------------------------------- + public synchronized void addSession(LocalSession session) { - sessionCache.addSession(session); + localSessionCache.addSession(session); Path sessionsPath = TenantRepository.getSessionsPath(session.getTenantName()); long sessionId = session.getSessionId(); Curator.FileCache fileCache = curator.createFileCache(sessionsPath.append(String.valueOf(sessionId)).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false); - sessionStateWatchers.put(sessionId, new LocalSessionStateWatcher(fileCache, session, this, zkWatcherExecutor)); + localSessionStateWatchers.put(sessionId, new LocalSessionStateWatcher(fileCache, session, this, zkWatcherExecutor)); } public LocalSession getSession(long sessionId) { - return sessionCache.getSession(sessionId); + return localSessionCache.getSession(sessionId); } public List<LocalSession> getSessions() { - return sessionCache.getSessions(); + return localSessionCache.getSessions(); } - private void loadSessions(SessionFactory sessionFactory) { + private void loadLocalSessions(SessionFactory sessionFactory) { File[] sessions = tenantFileSystemDirs.sessionsPath().listFiles(sessionApplicationsFilter); if (sessions == null) { return; @@ -88,7 +131,7 @@ public class SessionRepository { public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) { log.log(Level.FINE, "Purging old sessions"); try { - for (LocalSession candidate : sessionCache.getSessions()) { + for (LocalSession candidate : localSessionCache.getSessions()) { Instant createTime = candidate.getCreateTime(); log.log(Level.FINE, "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime); @@ -124,9 +167,9 @@ public class SessionRepository { public void deleteSession(LocalSession session) { long sessionId = session.getSessionId(); log.log(Level.FINE, "Deleting local session " + sessionId); - LocalSessionStateWatcher watcher = sessionStateWatchers.remove(sessionId); + LocalSessionStateWatcher watcher = localSessionStateWatchers.remove(sessionId); if (watcher != null) watcher.close(); - sessionCache.removeSession(sessionId); + localSessionCache.removeSession(sessionId); NestedTransaction transaction = new NestedTransaction(); session.delete(transaction); transaction.commit(); @@ -135,15 +178,167 @@ public class SessionRepository { public void close() { deleteAllSessions(); tenantFileSystemDirs.delete(); + try { + if (directoryCache != null) { + directoryCache.close(); + } + } catch (Exception e) { + log.log(Level.WARNING, "Exception when closing path cache", e); + } finally { + checkForRemovedSessions(new ArrayList<>()); + } } private void deleteAllSessions() { - List<LocalSession> sessions = new ArrayList<>(sessionCache.getSessions()); + List<LocalSession> sessions = new ArrayList<>(localSessionCache.getSessions()); for (LocalSession session : sessions) { deleteSession(session); } } + // ---------------- Remote sessions ---------------------------------------------------------------- + + public RemoteSession getRemoteSession(long sessionId) { + return remoteSessionCache.getSession(sessionId); + } + + public List<Long> getRemoteSessions() { + return getSessionList(curator.getChildren(sessionsPath)); + } + + public void addSession(RemoteSession session) { + remoteSessionCache.addSession(session); + metrics.incAddedSessions(); + } + + public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { + int deleted = 0; + for (long sessionId : getRemoteSessions()) { + RemoteSession session = remoteSessionCache.getSession(sessionId); + if (session == null) continue; // Internal sessions not in synch with zk, continue + if (session.getStatus() == Session.Status.ACTIVATE) continue; + if (sessionHasExpired(session.getCreateTime(), expiryTime, clock)) { + log.log(Level.INFO, "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it"); + session.delete(); + deleted++; + } + } + return deleted; + } + + private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) { + return (created.plus(expiryTime).isBefore(clock.instant())); + } + + private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) { + return getSessionList(children.stream() + .map(child -> Path.fromString(child.getPath()).getName()) + .collect(Collectors.toList())); + } + + private List<Long> getSessionList(List<String> children) { + return children.stream().map(Long::parseLong).collect(Collectors.toList()); + } + + private void initializeRemoteSessions() throws NumberFormatException { + getRemoteSessions().forEach(this::sessionAdded); + } + + private synchronized void sessionsChanged() throws NumberFormatException { + List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); + checkForRemovedSessions(sessions); + checkForAddedSessions(sessions); + } + + private void checkForRemovedSessions(List<Long> sessions) { + for (RemoteSession session : remoteSessionCache.getSessions()) + if ( ! sessions.contains(session.getSessionId())) + sessionRemoved(session.getSessionId()); + } + + private void checkForAddedSessions(List<Long> sessions) { + for (Long sessionId : sessions) + if (remoteSessionCache.getSession(sessionId) == null) + sessionAdded(sessionId); + } + + /** + * A session for which we don't have a watcher, i.e. hitherto unknown to us. + * + * @param sessionId session id for the new session + */ + private void sessionAdded(long sessionId) { + log.log(Level.FINE, () -> "Adding session to SessionRepository: " + sessionId); + RemoteSession session = sessionFactory.createRemoteSession(sessionId); + Path sessionPath = sessionsPath.append(String.valueOf(sessionId)); + Curator.FileCache fileCache = curator.createFileCache(sessionPath.append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH).getAbsolute(), false); + fileCache.addListener(this::nodeChanged); + loadSessionIfActive(session); + addSession(session); + remoteSessionStateWatchers.put(sessionId, new RemoteSessionStateWatcher(fileCache, reloadHandler, session, metrics, zkWatcherExecutor)); + if (distributeApplicationPackage.value()) + sessionFactory.createLocalSessionUsingDistributedApplicationPackage(sessionId); + } + + private void sessionRemoved(long sessionId) { + RemoteSessionStateWatcher watcher = remoteSessionStateWatchers.remove(sessionId); + if (watcher != null) watcher.close(); + remoteSessionCache.removeSession(sessionId); + metrics.incRemovedSessions(); + } + + private void loadSessionIfActive(RemoteSession session) { + for (ApplicationId applicationId : applicationRepo.activeApplications()) { + if (applicationRepo.requireActiveSessionOf(applicationId) == session.getSessionId()) { + log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); + reloadHandler.reloadConfig(session.ensureApplicationLoaded()); + log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); + return; + } + } + } + + private void nodeChanged() { + zkWatcherExecutor.execute(() -> { + Multiset<Session.Status> sessionMetrics = HashMultiset.create(); + for (RemoteSession session : remoteSessionCache.getSessions()) { + sessionMetrics.add(session.getStatus()); + } + metrics.setNewSessions(sessionMetrics.count(Session.Status.NEW)); + metrics.setPreparedSessions(sessionMetrics.count(Session.Status.PREPARE)); + metrics.setActivatedSessions(sessionMetrics.count(Session.Status.ACTIVATE)); + metrics.setDeactivatedSessions(sessionMetrics.count(Session.Status.DEACTIVATE)); + }); + } + + @SuppressWarnings("unused") + private void childEvent(CuratorFramework ignored, PathChildrenCacheEvent event) { + zkWatcherExecutor.execute(() -> { + log.log(Level.FINE, () -> "Got child event: " + event); + switch (event.getType()) { + case CHILD_ADDED: + sessionsChanged(); + synchronizeOnNew(getSessionListFromDirectoryCache(Collections.singletonList(event.getData()))); + break; + case CHILD_REMOVED: + sessionsChanged(); + break; + case CONNECTION_RECONNECTED: + sessionsChanged(); + break; + } + }); + } + + private void synchronizeOnNew(List<Long> sessionList) { + for (long sessionId : sessionList) { + RemoteSession session = remoteSessionCache.getSession(sessionId); + if (session == null) continue; // session might have been deleted after getting session list + log.log(Level.FINE, () -> session.logPre() + "Confirming upload for session " + sessionId); + session.confirmUpload(); + } + } + @Override public String toString() { return getSessions().toString(); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java index fb8af523bae..097c3146b81 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenant.java @@ -7,7 +7,6 @@ import com.yahoo.vespa.config.server.ReloadHandler; import com.yahoo.vespa.config.server.RequestHandler; import com.yahoo.vespa.config.server.application.TenantApplications; import com.yahoo.vespa.config.server.session.SessionRepository; -import com.yahoo.vespa.config.server.session.RemoteSessionRepo; import com.yahoo.vespa.config.server.session.SessionFactory; import com.yahoo.vespa.curator.Curator; import org.apache.zookeeper.data.Stat; @@ -28,7 +27,6 @@ public class Tenant implements TenantHandlerProvider { static final String APPLICATIONS = "applications"; private final TenantName name; - private final RemoteSessionRepo remoteSessionRepo; private final Path path; private final SessionFactory sessionFactory; private final SessionRepository sessionRepository; @@ -40,7 +38,6 @@ public class Tenant implements TenantHandlerProvider { Tenant(TenantName name, SessionFactory sessionFactory, SessionRepository sessionRepository, - RemoteSessionRepo remoteSessionRepo, RequestHandler requestHandler, ReloadHandler reloadHandler, TenantApplications applicationRepo, @@ -49,7 +46,6 @@ public class Tenant implements TenantHandlerProvider { this.path = TenantRepository.getTenantPath(name); this.requestHandler = requestHandler; this.reloadHandler = reloadHandler; - this.remoteSessionRepo = remoteSessionRepo; this.sessionFactory = sessionFactory; this.sessionRepository = sessionRepository; this.applicationRepo = applicationRepo; @@ -74,13 +70,8 @@ public class Tenant implements TenantHandlerProvider { return requestHandler; } - /** - * The RemoteSessionRepo for this - * - * @return repo - */ - public RemoteSessionRepo getRemoteSessionRepo() { - return remoteSessionRepo; + public SessionRepository getSessionRepo() { + return sessionRepository; } public TenantName getName() { @@ -140,7 +131,7 @@ public class Tenant implements TenantHandlerProvider { * Called by watchers as a reaction to {@link #delete()}. */ void close() { - remoteSessionRepo.close(); // Closes watchers and clears memory. + sessionRepository.close(); // Closes watchers and clears memory. applicationRepo.close(); // Closes watchers. sessionRepository.close(); // Closes watchers, clears memory, and deletes local files and ZK session state. } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java index 44c16f2013d..caf9982e034 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/TenantRepository.java @@ -15,7 +15,6 @@ import com.yahoo.vespa.config.server.application.TenantApplications; import com.yahoo.vespa.config.server.deploy.TenantFileSystemDirs; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import com.yahoo.vespa.config.server.session.SessionRepository; -import com.yahoo.vespa.config.server.session.RemoteSessionRepo; import com.yahoo.vespa.config.server.session.SessionFactory; import com.yahoo.vespa.curator.Curator; import org.apache.curator.framework.CuratorFramework; @@ -224,15 +223,11 @@ public class TenantRepository { if (reloadHandler == null) reloadHandler = applicationRepo; SessionFactory sessionFactory = new SessionFactory(componentRegistry, applicationRepo, applicationRepo, tenantName); - SessionRepository sessionRepository = new SessionRepository(tenantName, componentRegistry, sessionFactory); - RemoteSessionRepo remoteSessionRepo = new RemoteSessionRepo(componentRegistry, - sessionFactory, - reloadHandler, - tenantName, - applicationRepo, + SessionRepository sessionRepository = new SessionRepository(tenantName, componentRegistry, sessionFactory, + applicationRepo, reloadHandler, componentRegistry.getFlagSource()); log.log(Level.INFO, "Creating tenant '" + tenantName + "'"); - Tenant tenant = new Tenant(tenantName, sessionFactory, sessionRepository, remoteSessionRepo, requestHandler, + Tenant tenant = new Tenant(tenantName, sessionFactory, sessionRepository, requestHandler, reloadHandler, applicationRepo, componentRegistry.getCurator()); notifyNewTenant(tenant); tenants.putIfAbsent(tenantName, tenant); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java index 77ecf3f7fdd..d609f758c68 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/ApplicationRepositoryTest.java @@ -298,14 +298,14 @@ public class ApplicationRepositoryTest { LocalSession applicationData = tenant.getSessionRepository().getSession(sessionId); assertNotNull(applicationData); assertNotNull(applicationData.getApplicationId()); - assertNotNull(tenant.getRemoteSessionRepo().getSession(sessionId)); + assertNotNull(tenant.getSessionRepo().getSession(sessionId)); assertNotNull(applicationRepository.getActiveSession(applicationId())); // Delete app and verify that it has been deleted from repos and provisioner assertTrue(applicationRepository.delete(applicationId())); assertNull(applicationRepository.getActiveSession(applicationId())); assertNull(tenant.getSessionRepository().getSession(sessionId)); - assertNull(tenant.getRemoteSessionRepo().getSession(sessionId)); + assertNull(tenant.getSessionRepo().getSession(sessionId)); assertTrue(provisioner.removed); assertEquals(tenant.getName(), provisioner.lastApplicationId.tenant()); assertEquals(applicationId(), provisioner.lastApplicationId); @@ -346,7 +346,7 @@ public class ApplicationRepositoryTest { RemoteSession activeSession = applicationRepository.getActiveSession(applicationId()); assertNull(activeSession); Tenant tenant = tenantRepository.getTenant(applicationId().tenant()); - assertNull(tenant.getRemoteSessionRepo().getSession(prepareResult.sessionId())); + assertNull(tenant.getSessionRepo().getSession(prepareResult.sessionId())); assertTrue(applicationRepository.delete(applicationId())); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java index 362b92a8d85..ae113969ef4 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/HostHandlerTest.java @@ -56,7 +56,7 @@ public class HostHandlerTest { TestComponentRegistry componentRegistry = new TestComponentRegistry.Builder() .modelFactoryRegistry(new ModelFactoryRegistry(Collections.singletonList(new VespaModelFactory(new NullConfigModelRegistry())))) .build(); - tenant.getRemoteSessionRepo().addSession(new RemoteSession(tenant.getName(), sessionId, componentRegistry, new MockSessionZKClient(app))); + tenant.getSessionRepo().addSession(new RemoteSession(tenant.getName(), sessionId, componentRegistry, new MockSessionZKClient(app))); } @Before diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java deleted file mode 100644 index 468dd5a15a7..00000000000 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/session/RemoteSessionRepoTest.java +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.config.server.session; - -import com.yahoo.config.provision.TenantName; -import com.yahoo.path.Path; -import com.yahoo.text.Utf8; -import com.yahoo.vespa.config.server.TestComponentRegistry; -import com.yahoo.vespa.config.server.tenant.Tenant; -import com.yahoo.vespa.config.server.tenant.TenantRepository; -import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; -import com.yahoo.vespa.curator.Curator; -import com.yahoo.vespa.curator.mock.MockCurator; -import org.junit.Before; -import org.junit.Test; - -import java.time.Duration; -import java.time.Instant; -import java.util.function.LongPredicate; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -/** - * @author Ulf Lilleengen - */ -public class RemoteSessionRepoTest { - - private static final TenantName tenantName = TenantName.defaultName(); - - private RemoteSessionRepo remoteSessionRepo; - private Curator curator; - TenantRepository tenantRepository; - - @Before - public void setupFacade() { - curator = new MockCurator(); - TestComponentRegistry componentRegistry = new TestComponentRegistry.Builder() - .curator(curator) - .build(); - tenantRepository = new TenantRepository(componentRegistry, false); - tenantRepository.addTenant(tenantName); - this.remoteSessionRepo = tenantRepository.getTenant(tenantName).getRemoteSessionRepo(); - curator.create(TenantRepository.getTenantPath(tenantName).append("/applications")); - curator.create(TenantRepository.getSessionsPath(tenantName)); - createSession(1L, false); - createSession(2L, false); - } - - private void createSession(long sessionId, boolean wait) { - createSession(sessionId, wait, tenantName); - } - - private void createSession(long sessionId, boolean wait, TenantName tenantName) { - Path sessionsPath = TenantRepository.getSessionsPath(tenantName); - SessionZooKeeperClient zkc = new SessionZooKeeperClient(curator, sessionsPath.append(String.valueOf(sessionId))); - zkc.createNewSession(Instant.now()); - if (wait) { - Curator.CompletionWaiter waiter = zkc.getUploadWaiter(); - waiter.awaitCompletion(Duration.ofSeconds(120)); - } - } - - @Test - public void testInitialize() { - assertSessionExists(1L); - assertSessionExists(2L); - } - - @Test - public void testCreateSession() { - createSession(3L, true); - assertSessionExists(3L); - } - - @Test - public void testSessionStateChange() throws Exception { - long sessionId = 3L; - createSession(sessionId, true); - assertSessionStatus(sessionId, Session.Status.NEW); - assertStatusChange(sessionId, Session.Status.PREPARE); - assertStatusChange(sessionId, Session.Status.ACTIVATE); - - Path session = TenantRepository.getSessionsPath(tenantName).append("" + sessionId); - curator.delete(session); - assertSessionRemoved(sessionId); - assertNull(remoteSessionRepo.getSession(sessionId)); - } - - // If reading a session throws an exception it should be handled and not prevent other applications - // from loading. In this test we just show that we end up with one session in remote session - // repo even if it had bad data (by making getSessionIdForApplication() in FailingTenantApplications - // throw an exception). - @Test - public void testBadApplicationRepoOnActivate() { - long sessionId = 3L; - TenantName mytenant = TenantName.from("mytenant"); - curator.set(TenantRepository.getApplicationsPath(mytenant).append("mytenant:appX:default"), new byte[0]); // Invalid data - tenantRepository.addTenant(mytenant); - Tenant tenant = tenantRepository.getTenant(mytenant); - curator.create(TenantRepository.getSessionsPath(mytenant)); - remoteSessionRepo = tenant.getRemoteSessionRepo(); - assertThat(remoteSessionRepo.getSessions().size(), is(0)); - createSession(sessionId, true, mytenant); - assertThat(remoteSessionRepo.getSessions().size(), is(1)); - } - - private void assertStatusChange(long sessionId, Session.Status status) throws Exception { - Path statePath = TenantRepository.getSessionsPath(tenantName).append("" + sessionId).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH); - curator.create(statePath); - curator.framework().setData().forPath(statePath.getAbsolute(), Utf8.toBytes(status.toString())); - assertSessionStatus(sessionId, status); - } - - private void assertSessionRemoved(long sessionId) { - waitFor(p -> remoteSessionRepo.getSession(sessionId) == null, sessionId); - assertNull(remoteSessionRepo.getSession(sessionId)); - } - - private void assertSessionExists(long sessionId) { - assertSessionStatus(sessionId, Session.Status.NEW); - } - - private void assertSessionStatus(long sessionId, Session.Status status) { - waitFor(p -> remoteSessionRepo.getSession(sessionId) != null && - remoteSessionRepo.getSession(sessionId).getStatus() == status, sessionId); - assertNotNull(remoteSessionRepo.getSession(sessionId)); - assertThat(remoteSessionRepo.getSession(sessionId).getStatus(), is(status)); - } - - private void waitFor(LongPredicate predicate, long sessionId) { - long endTime = System.currentTimeMillis() + 60_000; - boolean ok; - do { - ok = predicate.test(sessionId); - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } while (System.currentTimeMillis() < endTime && !ok); - } - -} diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java index 1efc0c0cd84..3c1404b5048 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/session/SessionRepositoryTest.java @@ -5,12 +5,18 @@ import com.yahoo.cloud.config.ConfigserverConfig; import com.yahoo.config.model.application.provider.FilesApplicationPackage; import com.yahoo.config.provision.TenantName; import com.yahoo.io.IOUtils; +import com.yahoo.text.Utf8; import com.yahoo.vespa.config.server.GlobalComponentRegistry; import com.yahoo.vespa.config.server.TestComponentRegistry; import com.yahoo.vespa.config.server.application.TenantApplications; import com.yahoo.vespa.config.server.host.HostRegistry; import com.yahoo.vespa.config.server.http.SessionHandlerTest; +import com.yahoo.vespa.config.server.tenant.Tenant; +import com.yahoo.vespa.config.server.tenant.TenantRepository; +import com.yahoo.vespa.config.server.zookeeper.ConfigCurator; +import com.yahoo.vespa.curator.Curator; import com.yahoo.vespa.curator.mock.MockCurator; +import com.yahoo.vespa.flags.InMemoryFlagSource; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -19,9 +25,14 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; +import java.time.Instant; +import java.util.function.LongPredicate; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; /** @@ -30,7 +41,9 @@ import static org.junit.Assert.fail; public class SessionRepositoryTest { private File testApp = new File("src/test/apps/app"); - private SessionRepository repo; + private MockCurator curator; + private SessionRepository sessionRepository; + private TenantRepository tenantRepository; private static final TenantName tenantName = TenantName.defaultName(); @Rule @@ -49,44 +62,50 @@ public class SessionRepositoryTest { IOUtils.copyDirectory(testApp, sessionsPath.resolve("2").toFile()); IOUtils.copyDirectory(testApp, sessionsPath.resolve("3").toFile()); } + curator = new MockCurator(); + curator.create(TenantRepository.getTenantPath(tenantName).append("/applications")); + curator.create(TenantRepository.getSessionsPath(tenantName)); GlobalComponentRegistry globalComponentRegistry = new TestComponentRegistry.Builder() - .curator(new MockCurator()) + .curator(curator) .configServerConfig(new ConfigserverConfig.Builder() .configServerDBDir(configserverDbDir.getAbsolutePath()) .configDefinitionsDir(temporaryFolder.newFolder().getAbsolutePath()) .sessionLifetime(5) .build()) .build(); + tenantRepository = new TenantRepository(globalComponentRegistry, false); + TenantApplications applicationRepo = TenantApplications.create(globalComponentRegistry, tenantName); SessionFactory sessionFactory = new SessionFactory(globalComponentRegistry, - TenantApplications.create(globalComponentRegistry, tenantName), + applicationRepo, new HostRegistry<>(), tenantName); - repo = new SessionRepository(tenantName, globalComponentRegistry, sessionFactory); + sessionRepository = new SessionRepository(tenantName, globalComponentRegistry, sessionFactory, + applicationRepo, applicationRepo, new InMemoryFlagSource()); } @Test public void require_that_sessions_can_be_loaded_from_disk() { - assertNotNull(repo.getSession(1L)); - assertNotNull(repo.getSession(2L)); - assertNotNull(repo.getSession(3L)); - assertNull(repo.getSession(4L)); + assertNotNull(sessionRepository.getSession(1L)); + assertNotNull(sessionRepository.getSession(2L)); + assertNotNull(sessionRepository.getSession(3L)); + assertNull(sessionRepository.getSession(4L)); } @Test public void require_that_all_sessions_are_deleted() { - repo.close(); - assertNull(repo.getSession(1L)); - assertNull(repo.getSession(2L)); - assertNull(repo.getSession(3L)); + sessionRepository.close(); + assertNull(sessionRepository.getSession(1L)); + assertNull(sessionRepository.getSession(2L)); + assertNull(sessionRepository.getSession(3L)); } @Test public void require_that_sessions_belong_to_a_tenant() { // tenant is "default" - assertNotNull(repo.getSession(1L)); - assertNotNull(repo.getSession(2L)); - assertNotNull(repo.getSession(3L)); - assertNull(repo.getSession(4L)); + assertNotNull(sessionRepository.getSession(1L)); + assertNotNull(sessionRepository.getSession(2L)); + assertNotNull(sessionRepository.getSession(3L)); + assertNull(sessionRepository.getSession(4L)); // tenant is "newTenant" try { @@ -94,12 +113,109 @@ public class SessionRepositoryTest { } catch (Exception e) { fail(); } - assertNull(repo.getSession(1L)); + assertNull(sessionRepository.getSession(1L)); - repo.addSession(new SessionHandlerTest.MockLocalSession(1L, FilesApplicationPackage.fromFile(testApp))); - repo.addSession(new SessionHandlerTest.MockLocalSession(2L, FilesApplicationPackage.fromFile(testApp))); - assertNotNull(repo.getSession(1L)); - assertNotNull(repo.getSession(2L)); - assertNull(repo.getSession(3L)); + sessionRepository.addSession(new SessionHandlerTest.MockLocalSession(1L, FilesApplicationPackage.fromFile(testApp))); + sessionRepository.addSession(new SessionHandlerTest.MockLocalSession(2L, FilesApplicationPackage.fromFile(testApp))); + assertNotNull(sessionRepository.getSession(1L)); + assertNotNull(sessionRepository.getSession(2L)); + assertNull(sessionRepository.getSession(3L)); } + + private void createSession(long sessionId, boolean wait) { + createSession(sessionId, wait, tenantName); + } + + private void createSession(long sessionId, boolean wait, TenantName tenantName) { + com.yahoo.path.Path sessionsPath = TenantRepository.getSessionsPath(tenantName); + SessionZooKeeperClient zkc = new SessionZooKeeperClient(curator, sessionsPath.append(String.valueOf(sessionId))); + zkc.createNewSession(Instant.now()); + if (wait) { + Curator.CompletionWaiter waiter = zkc.getUploadWaiter(); + waiter.awaitCompletion(Duration.ofSeconds(120)); + } + } + + @Test + public void testInitialize() { + createSession(10L, false); + createSession(11L, false); + assertSessionExists(10L); + assertSessionExists(11L); + } + + @Test + public void testCreateSession() { + createSession(12L, true); + assertSessionExists(12L); + } + + @Test + public void testSessionStateChange() throws Exception { + long sessionId = 3L; + createSession(sessionId, true); + assertSessionStatus(sessionId, Session.Status.NEW); + assertStatusChange(sessionId, Session.Status.PREPARE); + assertStatusChange(sessionId, Session.Status.ACTIVATE); + + com.yahoo.path.Path session = TenantRepository.getSessionsPath(tenantName).append("" + sessionId); + curator.delete(session); + assertSessionRemoved(sessionId); + assertNull(sessionRepository.getRemoteSession(sessionId)); + } + + // If reading a session throws an exception it should be handled and not prevent other applications + // from loading. In this test we just show that we end up with one session in remote session + // repo even if it had bad data (by making getSessionIdForApplication() in FailingTenantApplications + // throw an exception). + @Test + public void testBadApplicationRepoOnActivate() { + long sessionId = 3L; + TenantName mytenant = TenantName.from("mytenant"); + curator.set(TenantRepository.getApplicationsPath(mytenant).append("mytenant:appX:default"), new byte[0]); // Invalid data + tenantRepository.addTenant(mytenant); + Tenant tenant = tenantRepository.getTenant(mytenant); + curator.create(TenantRepository.getSessionsPath(mytenant)); + sessionRepository = tenant.getSessionRepo(); + assertThat(sessionRepository.getRemoteSessions().size(), is(0)); + createSession(sessionId, true, mytenant); + assertThat(sessionRepository.getRemoteSessions().size(), is(1)); + } + + private void assertStatusChange(long sessionId, Session.Status status) throws Exception { + com.yahoo.path.Path statePath = TenantRepository.getSessionsPath(tenantName).append("" + sessionId).append(ConfigCurator.SESSIONSTATE_ZK_SUBPATH); + curator.create(statePath); + curator.framework().setData().forPath(statePath.getAbsolute(), Utf8.toBytes(status.toString())); + assertSessionStatus(sessionId, status); + } + + private void assertSessionRemoved(long sessionId) { + waitFor(p -> sessionRepository.getRemoteSession(sessionId) == null, sessionId); + assertNull(sessionRepository.getRemoteSession(sessionId)); + } + + private void assertSessionExists(long sessionId) { + assertSessionStatus(sessionId, Session.Status.NEW); + } + + private void assertSessionStatus(long sessionId, Session.Status status) { + waitFor(p -> sessionRepository.getRemoteSession(sessionId) != null && + sessionRepository.getRemoteSession(sessionId).getStatus() == status, sessionId); + assertNotNull(sessionRepository.getRemoteSession(sessionId)); + assertThat(sessionRepository.getRemoteSession(sessionId).getStatus(), is(status)); + } + + private void waitFor(LongPredicate predicate, long sessionId) { + long endTime = System.currentTimeMillis() + 60_000; + boolean ok; + do { + ok = predicate.test(sessionId); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } while (System.currentTimeMillis() < endTime && !ok); + } + } |