diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-10-07 11:54:48 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-10-07 11:54:48 +0200 |
commit | 82d9aa13e986fe98f8eab8d31bd115097fae87c0 (patch) | |
tree | e0a6ea90b368f21821e96b3765388323d545c286 /configserver | |
parent | 63167e982d458a2a62fe6262078e3400e28e3632 (diff) |
Move code around, no functional changes
Diffstat (limited to 'configserver')
-rw-r--r-- | configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java | 357 |
1 files changed, 181 insertions, 176 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index 1a85a1dc0c9..fc1b25596f2 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -170,43 +170,6 @@ public class SessionRepository { return actions; } - public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) { - log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'"); - try { - for (LocalSession candidate : localSessionCache.values()) { - Instant createTime = candidate.getCreateTime(); - log.log(Level.FINE, () -> "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime); - - // Sessions with state other than ACTIVATE - if (hasExpired(candidate) && !isActiveSession(candidate)) { - deleteLocalSession(candidate); - } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) { - // Sessions with state ACTIVATE, but which are not actually active - Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId(); - if (applicationId.isEmpty()) continue; - Long activeSession = activeSessions.get(applicationId.get()); - if (activeSession == null || activeSession != candidate.getSessionId()) { - deleteLocalSession(candidate); - log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " + - createTime + " for '" + applicationId + "'"); - } - } - } - // Make sure to catch here, to avoid executor just dying in case of issues ... - } catch (Throwable e) { - log.log(Level.WARNING, "Error when purging old sessions ", e); - } - log.log(Level.FINE, () -> "Done purging old sessions"); - } - - private boolean hasExpired(LocalSession candidate) { - return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant()); - } - - private boolean isActiveSession(LocalSession candidate) { - return candidate.getStatus() == Session.Status.ACTIVATE; - } - // Will delete session data in ZooKeeper and file system public void deleteLocalSession(LocalSession session) { long sessionId = session.getSessionId(); @@ -219,24 +182,88 @@ public class SessionRepository { transaction.commit(); } - public void close() { - deleteAllSessions(); - tenantFileSystemDirs.delete(); + /** + * Creates a new deployment session from an application package. + * + * @param applicationDirectory a File pointing to an application. + * @param applicationId application id for this new session. + * @param timeoutBudget Timeout for creating session and waiting for other servers. + * @return a new session + */ + public LocalSession createSession(File applicationDirectory, ApplicationId applicationId, TimeoutBudget timeoutBudget) { + applicationRepo.createApplication(applicationId); + Optional<Long> activeSessionId = applicationRepo.activeSessionOf(applicationId); + return create(applicationDirectory, applicationId, activeSessionId, false, timeoutBudget); + } + + private LocalSession createSessionFromApplication(ApplicationPackage applicationPackage, + long sessionId, + TimeoutBudget timeoutBudget, + Clock clock) { + log.log(Level.FINE, () -> TenantRepository.logPre(tenantName) + "Creating session " + sessionId + " in ZooKeeper"); + SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); + sessionZKClient.createNewSession(clock.instant()); + Curator.CompletionWaiter waiter = sessionZKClient.getUploadWaiter(); + LocalSession session = createLocalSession(sessionId, applicationPackage); + waiter.awaitCompletion(timeoutBudget.timeLeft()); + return session; + } + + /** + * Creates a new deployment session from an already existing session. + * + * @param existingSession the session to use as base + * @param logger a deploy logger where the deploy log will be written. + * @param internalRedeploy whether this session is for a system internal redeploy — not an application package change + * @param timeoutBudget timeout for creating session and waiting for other servers. + * @return a new session + */ + public LocalSession createSessionFromExisting(Session existingSession, + DeployLogger logger, + boolean internalRedeploy, + TimeoutBudget timeoutBudget) { + File existingApp = getSessionAppDir(existingSession.getSessionId()); + ApplicationId existingApplicationId = existingSession.getApplicationId(); + + Optional<Long> activeSessionId = getActiveSessionId(existingApplicationId); + logger.log(Level.FINE, "Create new session for application id '" + existingApplicationId + "' from existing active session " + activeSessionId); + LocalSession session = create(existingApp, existingApplicationId, activeSessionId, internalRedeploy, timeoutBudget); + // Note: Needs to be kept in sync with calls in SessionPreparer.writeStateToZooKeeper() + session.setApplicationId(existingApplicationId); + if (existingSession.getApplicationPackageReference() != null) { + session.setApplicationPackageReference(existingSession.getApplicationPackageReference()); + } + session.setVespaVersion(existingSession.getVespaVersion()); + session.setDockerImageRepository(existingSession.getDockerImageRepository()); + session.setAthenzDomain(existingSession.getAthenzDomain()); + return session; + } + + private LocalSession create(File applicationFile, ApplicationId applicationId, Optional<Long> currentlyActiveSessionId, + boolean internalRedeploy, TimeoutBudget timeoutBudget) { + long sessionId = getNextSessionId(); try { - if (directoryCache != null) { - directoryCache.close(); - } + ensureSessionPathDoesNotExist(sessionId); + ApplicationPackage app = createApplicationPackage(applicationFile, applicationId, + sessionId, currentlyActiveSessionId, internalRedeploy); + return createSessionFromApplication(app, sessionId, timeoutBudget, clock); } catch (Exception e) { - log.log(Level.WARNING, "Exception when closing path cache", e); - } finally { - checkForRemovedSessions(new ArrayList<>()); + throw new RuntimeException("Error creating session " + sessionId, e); } } - private void deleteAllSessions() { - List<LocalSession> sessions = new ArrayList<>(localSessionCache.values()); - for (LocalSession session : sessions) { - deleteLocalSession(session); + /** + * This method is used when creating a session based on a remote session and the distributed application package + * It does not wait for session being created on other servers + */ + private LocalSession createLocalSession(File applicationFile, ApplicationId applicationId, long sessionId) { + try { + Optional<Long> currentlyActiveSessionId = getActiveSessionId(applicationId); + ApplicationPackage applicationPackage = createApplicationPackage(applicationFile, applicationId, + sessionId, currentlyActiveSessionId, false); + return createLocalSession(sessionId, applicationPackage); + } catch (Exception e) { + throw new RuntimeException("Error creating session " + sessionId, e); } } @@ -250,6 +277,15 @@ public class SessionRepository { return getSessionList(curator.getChildren(sessionsPath)); } + public synchronized RemoteSession createRemoteSession(long sessionId) { + SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); + RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); + remoteSessionCache.put(sessionId, session); + loadSessionIfActive(session); + updateSessionStateWatcher(sessionId, session); + return session; + } + public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { int deleted = 0; for (long sessionId : getRemoteSessions()) { @@ -295,24 +331,6 @@ public class SessionRepository { getRemoteSessions().forEach(this::sessionAdded); } - private synchronized void sessionsChanged() throws NumberFormatException { - List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); - checkForRemovedSessions(sessions); - checkForAddedSessions(sessions); - } - - private void checkForRemovedSessions(List<Long> sessions) { - for (RemoteSession session : remoteSessionCache.values()) - if ( ! sessions.contains(session.getSessionId())) - sessionRemoved(session.getSessionId()); - } - - private void checkForAddedSessions(List<Long> sessions) { - for (Long sessionId : sessions) - if (remoteSessionCache.get(sessionId) == null) - sessionAdded(sessionId); - } - /** * A session for which we don't have a watcher, i.e. hitherto unknown to us. * @@ -406,6 +424,27 @@ public class SessionRepository { log.log(Level.FINE, "Done notifying upload for session " + sessionId); } + private ApplicationSet loadApplication(RemoteSession session) { + log.log(Level.FINE, () -> "Loading application for " + session); + SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); + ApplicationPackage applicationPackage = sessionZooKeeperClient.loadApplicationPackage(); + ActivatedModelsBuilder builder = new ActivatedModelsBuilder(session.getTenantName(), + session.getSessionId(), + sessionZooKeeperClient, + componentRegistry); + // Read hosts allocated on the config server instance which created this + SettableOptional<AllocatedHosts> allocatedHosts = new SettableOptional<>(applicationPackage.getAllocatedHosts()); + + return ApplicationSet.fromList(builder.buildModels(session.getApplicationId(), + sessionZooKeeperClient.readDockerImageRepository(), + sessionZooKeeperClient.readVespaVersion(), + applicationPackage, + allocatedHosts, + clock.instant())); + } + + // ---------------- Watcher stuff ---------------------------------------------------------------- + void notifyCompletion(Curator.CompletionWaiter completionWaiter, RemoteSession session) { try { completionWaiter.notifyCompletion(); @@ -430,25 +469,6 @@ public class SessionRepository { } } - private ApplicationSet loadApplication(RemoteSession session) { - log.log(Level.FINE, () -> "Loading application for " + session); - SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); - ApplicationPackage applicationPackage = sessionZooKeeperClient.loadApplicationPackage(); - ActivatedModelsBuilder builder = new ActivatedModelsBuilder(session.getTenantName(), - session.getSessionId(), - sessionZooKeeperClient, - componentRegistry); - // Read hosts allocated on the config server instance which created this - SettableOptional<AllocatedHosts> allocatedHosts = new SettableOptional<>(applicationPackage.getAllocatedHosts()); - - return ApplicationSet.fromList(builder.buildModels(session.getApplicationId(), - sessionZooKeeperClient.readDockerImageRepository(), - sessionZooKeeperClient.readVespaVersion(), - applicationPackage, - allocatedHosts, - clock.instant())); - } - private void nodeChanged() { zkWatcherExecutor.execute(() -> { Multiset<Session.Status> sessionMetrics = HashMultiset.create(); @@ -478,27 +498,43 @@ public class SessionRepository { }); } - /** - * Creates a new deployment session from an application package. - * - * @param applicationDirectory a File pointing to an application. - * @param applicationId application id for this new session. - * @param timeoutBudget Timeout for creating session and waiting for other servers. - * @return a new session - */ - public LocalSession createSession(File applicationDirectory, ApplicationId applicationId, TimeoutBudget timeoutBudget) { - applicationRepo.createApplication(applicationId); - Optional<Long> activeSessionId = applicationRepo.activeSessionOf(applicationId); - return create(applicationDirectory, applicationId, activeSessionId, false, timeoutBudget); + // ---------------- Common stuff ---------------------------------------------------------------- + + public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) { + log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'"); + try { + for (LocalSession candidate : localSessionCache.values()) { + Instant createTime = candidate.getCreateTime(); + log.log(Level.FINE, () -> "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime); + + // Sessions with state other than ACTIVATE + if (hasExpired(candidate) && !isActiveSession(candidate)) { + deleteLocalSession(candidate); + } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) { + // Sessions with state ACTIVATE, but which are not actually active + Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId(); + if (applicationId.isEmpty()) continue; + Long activeSession = activeSessions.get(applicationId.get()); + if (activeSession == null || activeSession != candidate.getSessionId()) { + deleteLocalSession(candidate); + log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " + + createTime + " for '" + applicationId + "'"); + } + } + } + // Make sure to catch here, to avoid executor just dying in case of issues ... + } catch (Throwable e) { + log.log(Level.WARNING, "Error when purging old sessions ", e); + } + log.log(Level.FINE, () -> "Done purging old sessions"); } - public synchronized RemoteSession createRemoteSession(long sessionId) { - SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); - RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); - remoteSessionCache.put(sessionId, session); - loadSessionIfActive(session); - updateSessionStateWatcher(sessionId, session); - return session; + private boolean isActiveSession(LocalSession candidate) { + return candidate.getStatus() == Session.Status.ACTIVATE; + } + + private boolean hasExpired(LocalSession candidate) { + return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant()); } private void ensureSessionPathDoesNotExist(long sessionId) { @@ -524,76 +560,6 @@ public class SessionRepository { return FilesApplicationPackage.fromFileWithDeployData(configApplicationDir, deployData); } - private LocalSession createSessionFromApplication(ApplicationPackage applicationPackage, - long sessionId, - TimeoutBudget timeoutBudget, - Clock clock) { - log.log(Level.FINE, () -> TenantRepository.logPre(tenantName) + "Creating session " + sessionId + " in ZooKeeper"); - SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); - sessionZKClient.createNewSession(clock.instant()); - Curator.CompletionWaiter waiter = sessionZKClient.getUploadWaiter(); - LocalSession session = createLocalSession(sessionId, applicationPackage); - waiter.awaitCompletion(timeoutBudget.timeLeft()); - return session; - } - - /** - * Creates a new deployment session from an already existing session. - * - * @param existingSession the session to use as base - * @param logger a deploy logger where the deploy log will be written. - * @param internalRedeploy whether this session is for a system internal redeploy — not an application package change - * @param timeoutBudget timeout for creating session and waiting for other servers. - * @return a new session - */ - public LocalSession createSessionFromExisting(Session existingSession, - DeployLogger logger, - boolean internalRedeploy, - TimeoutBudget timeoutBudget) { - File existingApp = getSessionAppDir(existingSession.getSessionId()); - ApplicationId existingApplicationId = existingSession.getApplicationId(); - - Optional<Long> activeSessionId = getActiveSessionId(existingApplicationId); - logger.log(Level.FINE, "Create new session for application id '" + existingApplicationId + "' from existing active session " + activeSessionId); - LocalSession session = create(existingApp, existingApplicationId, activeSessionId, internalRedeploy, timeoutBudget); - // Note: Needs to be kept in sync with calls in SessionPreparer.writeStateToZooKeeper() - session.setApplicationId(existingApplicationId); - if (existingSession.getApplicationPackageReference() != null) { - session.setApplicationPackageReference(existingSession.getApplicationPackageReference()); - } - session.setVespaVersion(existingSession.getVespaVersion()); - session.setDockerImageRepository(existingSession.getDockerImageRepository()); - session.setAthenzDomain(existingSession.getAthenzDomain()); - return session; - } - - private LocalSession create(File applicationFile, ApplicationId applicationId, Optional<Long> currentlyActiveSessionId, - boolean internalRedeploy, TimeoutBudget timeoutBudget) { - long sessionId = getNextSessionId(); - try { - ensureSessionPathDoesNotExist(sessionId); - ApplicationPackage app = createApplicationPackage(applicationFile, applicationId, - sessionId, currentlyActiveSessionId, internalRedeploy); - return createSessionFromApplication(app, sessionId, timeoutBudget, clock); - } catch (Exception e) { - throw new RuntimeException("Error creating session " + sessionId, e); - } - } - - /** - * This method is used when creating a session based on a remote session and the distributed application package - * It does not wait for session being created on other servers - */ - private LocalSession createLocalSession(File applicationFile, ApplicationId applicationId, long sessionId) { - try { - Optional<Long> currentlyActiveSessionId = getActiveSessionId(applicationId); - ApplicationPackage applicationPackage = createApplicationPackage(applicationFile, applicationId, - sessionId, currentlyActiveSessionId, false); - return createLocalSession(sessionId, applicationPackage); - } catch (Exception e) { - throw new RuntimeException("Error creating session " + sessionId, e); - } - } private ApplicationPackage createApplicationPackage(File applicationFile, ApplicationId applicationId, long sessionId, Optional<Long> currentlyActiveSessionId, @@ -745,12 +711,51 @@ public class SessionRepository { public Clock clock() { return clock; } + public void close() { + deleteAllSessions(); + tenantFileSystemDirs.delete(); + try { + if (directoryCache != null) { + directoryCache.close(); + } + } catch (Exception e) { + log.log(Level.WARNING, "Exception when closing path cache", e); + } finally { + checkForRemovedSessions(new ArrayList<>()); + } + } + public Transaction createActivateTransaction(Session session) { Transaction transaction = createSetStatusTransaction(session, Session.Status.ACTIVATE); transaction.add(applicationRepo.createPutTransaction(session.getApplicationId(), session.getSessionId()).operations()); return transaction; } + private synchronized void sessionsChanged() throws NumberFormatException { + List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData()); + checkForRemovedSessions(sessions); + checkForAddedSessions(sessions); + } + + private void checkForRemovedSessions(List<Long> sessions) { + for (RemoteSession session : remoteSessionCache.values()) + if ( ! sessions.contains(session.getSessionId())) + sessionRemoved(session.getSessionId()); + } + + private void checkForAddedSessions(List<Long> sessions) { + for (Long sessionId : sessions) + if (remoteSessionCache.get(sessionId) == null) + sessionAdded(sessionId); + } + + private void deleteAllSessions() { + List<LocalSession> sessions = new ArrayList<>(localSessionCache.values()); + for (LocalSession session : sessions) { + deleteLocalSession(session); + } + } + private Transaction createSetStatusTransaction(Session session, Session.Status status) { return session.sessionZooKeeperClient.createWriteStatusTransaction(status); } |