summaryrefslogtreecommitdiffstats
path: root/configserver
diff options
context:
space:
mode:
authorHarald Musum <musum@verizonmedia.com>2020-10-07 11:54:48 +0200
committerHarald Musum <musum@verizonmedia.com>2020-10-07 11:54:48 +0200
commit82d9aa13e986fe98f8eab8d31bd115097fae87c0 (patch)
treee0a6ea90b368f21821e96b3765388323d545c286 /configserver
parent63167e982d458a2a62fe6262078e3400e28e3632 (diff)
Move code around, no functional changes
Diffstat (limited to 'configserver')
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java357
1 files changed, 181 insertions, 176 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
index 1a85a1dc0c9..fc1b25596f2 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java
@@ -170,43 +170,6 @@ public class SessionRepository {
return actions;
}
- public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) {
- log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'");
- try {
- for (LocalSession candidate : localSessionCache.values()) {
- Instant createTime = candidate.getCreateTime();
- log.log(Level.FINE, () -> "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime);
-
- // Sessions with state other than ACTIVATE
- if (hasExpired(candidate) && !isActiveSession(candidate)) {
- deleteLocalSession(candidate);
- } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) {
- // Sessions with state ACTIVATE, but which are not actually active
- Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId();
- if (applicationId.isEmpty()) continue;
- Long activeSession = activeSessions.get(applicationId.get());
- if (activeSession == null || activeSession != candidate.getSessionId()) {
- deleteLocalSession(candidate);
- log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " +
- createTime + " for '" + applicationId + "'");
- }
- }
- }
- // Make sure to catch here, to avoid executor just dying in case of issues ...
- } catch (Throwable e) {
- log.log(Level.WARNING, "Error when purging old sessions ", e);
- }
- log.log(Level.FINE, () -> "Done purging old sessions");
- }
-
- private boolean hasExpired(LocalSession candidate) {
- return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant());
- }
-
- private boolean isActiveSession(LocalSession candidate) {
- return candidate.getStatus() == Session.Status.ACTIVATE;
- }
-
// Will delete session data in ZooKeeper and file system
public void deleteLocalSession(LocalSession session) {
long sessionId = session.getSessionId();
@@ -219,24 +182,88 @@ public class SessionRepository {
transaction.commit();
}
- public void close() {
- deleteAllSessions();
- tenantFileSystemDirs.delete();
+ /**
+ * Creates a new deployment session from an application package.
+ *
+ * @param applicationDirectory a File pointing to an application.
+ * @param applicationId application id for this new session.
+ * @param timeoutBudget Timeout for creating session and waiting for other servers.
+ * @return a new session
+ */
+ public LocalSession createSession(File applicationDirectory, ApplicationId applicationId, TimeoutBudget timeoutBudget) {
+ applicationRepo.createApplication(applicationId);
+ Optional<Long> activeSessionId = applicationRepo.activeSessionOf(applicationId);
+ return create(applicationDirectory, applicationId, activeSessionId, false, timeoutBudget);
+ }
+
+ private LocalSession createSessionFromApplication(ApplicationPackage applicationPackage,
+ long sessionId,
+ TimeoutBudget timeoutBudget,
+ Clock clock) {
+ log.log(Level.FINE, () -> TenantRepository.logPre(tenantName) + "Creating session " + sessionId + " in ZooKeeper");
+ SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
+ sessionZKClient.createNewSession(clock.instant());
+ Curator.CompletionWaiter waiter = sessionZKClient.getUploadWaiter();
+ LocalSession session = createLocalSession(sessionId, applicationPackage);
+ waiter.awaitCompletion(timeoutBudget.timeLeft());
+ return session;
+ }
+
+ /**
+ * Creates a new deployment session from an already existing session.
+ *
+ * @param existingSession the session to use as base
+ * @param logger a deploy logger where the deploy log will be written.
+ * @param internalRedeploy whether this session is for a system internal redeploy — not an application package change
+ * @param timeoutBudget timeout for creating session and waiting for other servers.
+ * @return a new session
+ */
+ public LocalSession createSessionFromExisting(Session existingSession,
+ DeployLogger logger,
+ boolean internalRedeploy,
+ TimeoutBudget timeoutBudget) {
+ File existingApp = getSessionAppDir(existingSession.getSessionId());
+ ApplicationId existingApplicationId = existingSession.getApplicationId();
+
+ Optional<Long> activeSessionId = getActiveSessionId(existingApplicationId);
+ logger.log(Level.FINE, "Create new session for application id '" + existingApplicationId + "' from existing active session " + activeSessionId);
+ LocalSession session = create(existingApp, existingApplicationId, activeSessionId, internalRedeploy, timeoutBudget);
+ // Note: Needs to be kept in sync with calls in SessionPreparer.writeStateToZooKeeper()
+ session.setApplicationId(existingApplicationId);
+ if (existingSession.getApplicationPackageReference() != null) {
+ session.setApplicationPackageReference(existingSession.getApplicationPackageReference());
+ }
+ session.setVespaVersion(existingSession.getVespaVersion());
+ session.setDockerImageRepository(existingSession.getDockerImageRepository());
+ session.setAthenzDomain(existingSession.getAthenzDomain());
+ return session;
+ }
+
+ private LocalSession create(File applicationFile, ApplicationId applicationId, Optional<Long> currentlyActiveSessionId,
+ boolean internalRedeploy, TimeoutBudget timeoutBudget) {
+ long sessionId = getNextSessionId();
try {
- if (directoryCache != null) {
- directoryCache.close();
- }
+ ensureSessionPathDoesNotExist(sessionId);
+ ApplicationPackage app = createApplicationPackage(applicationFile, applicationId,
+ sessionId, currentlyActiveSessionId, internalRedeploy);
+ return createSessionFromApplication(app, sessionId, timeoutBudget, clock);
} catch (Exception e) {
- log.log(Level.WARNING, "Exception when closing path cache", e);
- } finally {
- checkForRemovedSessions(new ArrayList<>());
+ throw new RuntimeException("Error creating session " + sessionId, e);
}
}
- private void deleteAllSessions() {
- List<LocalSession> sessions = new ArrayList<>(localSessionCache.values());
- for (LocalSession session : sessions) {
- deleteLocalSession(session);
+ /**
+ * This method is used when creating a session based on a remote session and the distributed application package
+ * It does not wait for session being created on other servers
+ */
+ private LocalSession createLocalSession(File applicationFile, ApplicationId applicationId, long sessionId) {
+ try {
+ Optional<Long> currentlyActiveSessionId = getActiveSessionId(applicationId);
+ ApplicationPackage applicationPackage = createApplicationPackage(applicationFile, applicationId,
+ sessionId, currentlyActiveSessionId, false);
+ return createLocalSession(sessionId, applicationPackage);
+ } catch (Exception e) {
+ throw new RuntimeException("Error creating session " + sessionId, e);
}
}
@@ -250,6 +277,15 @@ public class SessionRepository {
return getSessionList(curator.getChildren(sessionsPath));
}
+ public synchronized RemoteSession createRemoteSession(long sessionId) {
+ SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
+ RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient);
+ remoteSessionCache.put(sessionId, session);
+ loadSessionIfActive(session);
+ updateSessionStateWatcher(sessionId, session);
+ return session;
+ }
+
public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) {
int deleted = 0;
for (long sessionId : getRemoteSessions()) {
@@ -295,24 +331,6 @@ public class SessionRepository {
getRemoteSessions().forEach(this::sessionAdded);
}
- private synchronized void sessionsChanged() throws NumberFormatException {
- List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData());
- checkForRemovedSessions(sessions);
- checkForAddedSessions(sessions);
- }
-
- private void checkForRemovedSessions(List<Long> sessions) {
- for (RemoteSession session : remoteSessionCache.values())
- if ( ! sessions.contains(session.getSessionId()))
- sessionRemoved(session.getSessionId());
- }
-
- private void checkForAddedSessions(List<Long> sessions) {
- for (Long sessionId : sessions)
- if (remoteSessionCache.get(sessionId) == null)
- sessionAdded(sessionId);
- }
-
/**
* A session for which we don't have a watcher, i.e. hitherto unknown to us.
*
@@ -406,6 +424,27 @@ public class SessionRepository {
log.log(Level.FINE, "Done notifying upload for session " + sessionId);
}
+ private ApplicationSet loadApplication(RemoteSession session) {
+ log.log(Level.FINE, () -> "Loading application for " + session);
+ SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId());
+ ApplicationPackage applicationPackage = sessionZooKeeperClient.loadApplicationPackage();
+ ActivatedModelsBuilder builder = new ActivatedModelsBuilder(session.getTenantName(),
+ session.getSessionId(),
+ sessionZooKeeperClient,
+ componentRegistry);
+ // Read hosts allocated on the config server instance which created this
+ SettableOptional<AllocatedHosts> allocatedHosts = new SettableOptional<>(applicationPackage.getAllocatedHosts());
+
+ return ApplicationSet.fromList(builder.buildModels(session.getApplicationId(),
+ sessionZooKeeperClient.readDockerImageRepository(),
+ sessionZooKeeperClient.readVespaVersion(),
+ applicationPackage,
+ allocatedHosts,
+ clock.instant()));
+ }
+
+ // ---------------- Watcher stuff ----------------------------------------------------------------
+
void notifyCompletion(Curator.CompletionWaiter completionWaiter, RemoteSession session) {
try {
completionWaiter.notifyCompletion();
@@ -430,25 +469,6 @@ public class SessionRepository {
}
}
- private ApplicationSet loadApplication(RemoteSession session) {
- log.log(Level.FINE, () -> "Loading application for " + session);
- SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId());
- ApplicationPackage applicationPackage = sessionZooKeeperClient.loadApplicationPackage();
- ActivatedModelsBuilder builder = new ActivatedModelsBuilder(session.getTenantName(),
- session.getSessionId(),
- sessionZooKeeperClient,
- componentRegistry);
- // Read hosts allocated on the config server instance which created this
- SettableOptional<AllocatedHosts> allocatedHosts = new SettableOptional<>(applicationPackage.getAllocatedHosts());
-
- return ApplicationSet.fromList(builder.buildModels(session.getApplicationId(),
- sessionZooKeeperClient.readDockerImageRepository(),
- sessionZooKeeperClient.readVespaVersion(),
- applicationPackage,
- allocatedHosts,
- clock.instant()));
- }
-
private void nodeChanged() {
zkWatcherExecutor.execute(() -> {
Multiset<Session.Status> sessionMetrics = HashMultiset.create();
@@ -478,27 +498,43 @@ public class SessionRepository {
});
}
- /**
- * Creates a new deployment session from an application package.
- *
- * @param applicationDirectory a File pointing to an application.
- * @param applicationId application id for this new session.
- * @param timeoutBudget Timeout for creating session and waiting for other servers.
- * @return a new session
- */
- public LocalSession createSession(File applicationDirectory, ApplicationId applicationId, TimeoutBudget timeoutBudget) {
- applicationRepo.createApplication(applicationId);
- Optional<Long> activeSessionId = applicationRepo.activeSessionOf(applicationId);
- return create(applicationDirectory, applicationId, activeSessionId, false, timeoutBudget);
+ // ---------------- Common stuff ----------------------------------------------------------------
+
+ public void deleteExpiredSessions(Map<ApplicationId, Long> activeSessions) {
+ log.log(Level.FINE, () -> "Purging old sessions for tenant '" + tenantName + "'");
+ try {
+ for (LocalSession candidate : localSessionCache.values()) {
+ Instant createTime = candidate.getCreateTime();
+ log.log(Level.FINE, () -> "Candidate session for deletion: " + candidate.getSessionId() + ", created: " + createTime);
+
+ // Sessions with state other than ACTIVATE
+ if (hasExpired(candidate) && !isActiveSession(candidate)) {
+ deleteLocalSession(candidate);
+ } else if (createTime.plus(Duration.ofDays(1)).isBefore(clock.instant())) {
+ // Sessions with state ACTIVATE, but which are not actually active
+ Optional<ApplicationId> applicationId = candidate.getOptionalApplicationId();
+ if (applicationId.isEmpty()) continue;
+ Long activeSession = activeSessions.get(applicationId.get());
+ if (activeSession == null || activeSession != candidate.getSessionId()) {
+ deleteLocalSession(candidate);
+ log.log(Level.INFO, "Deleted inactive session " + candidate.getSessionId() + " created " +
+ createTime + " for '" + applicationId + "'");
+ }
+ }
+ }
+ // Make sure to catch here, to avoid executor just dying in case of issues ...
+ } catch (Throwable e) {
+ log.log(Level.WARNING, "Error when purging old sessions ", e);
+ }
+ log.log(Level.FINE, () -> "Done purging old sessions");
}
- public synchronized RemoteSession createRemoteSession(long sessionId) {
- SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
- RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient);
- remoteSessionCache.put(sessionId, session);
- loadSessionIfActive(session);
- updateSessionStateWatcher(sessionId, session);
- return session;
+ private boolean isActiveSession(LocalSession candidate) {
+ return candidate.getStatus() == Session.Status.ACTIVATE;
+ }
+
+ private boolean hasExpired(LocalSession candidate) {
+ return candidate.getCreateTime().plus(sessionLifetime).isBefore(clock.instant());
}
private void ensureSessionPathDoesNotExist(long sessionId) {
@@ -524,76 +560,6 @@ public class SessionRepository {
return FilesApplicationPackage.fromFileWithDeployData(configApplicationDir, deployData);
}
- private LocalSession createSessionFromApplication(ApplicationPackage applicationPackage,
- long sessionId,
- TimeoutBudget timeoutBudget,
- Clock clock) {
- log.log(Level.FINE, () -> TenantRepository.logPre(tenantName) + "Creating session " + sessionId + " in ZooKeeper");
- SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId);
- sessionZKClient.createNewSession(clock.instant());
- Curator.CompletionWaiter waiter = sessionZKClient.getUploadWaiter();
- LocalSession session = createLocalSession(sessionId, applicationPackage);
- waiter.awaitCompletion(timeoutBudget.timeLeft());
- return session;
- }
-
- /**
- * Creates a new deployment session from an already existing session.
- *
- * @param existingSession the session to use as base
- * @param logger a deploy logger where the deploy log will be written.
- * @param internalRedeploy whether this session is for a system internal redeploy — not an application package change
- * @param timeoutBudget timeout for creating session and waiting for other servers.
- * @return a new session
- */
- public LocalSession createSessionFromExisting(Session existingSession,
- DeployLogger logger,
- boolean internalRedeploy,
- TimeoutBudget timeoutBudget) {
- File existingApp = getSessionAppDir(existingSession.getSessionId());
- ApplicationId existingApplicationId = existingSession.getApplicationId();
-
- Optional<Long> activeSessionId = getActiveSessionId(existingApplicationId);
- logger.log(Level.FINE, "Create new session for application id '" + existingApplicationId + "' from existing active session " + activeSessionId);
- LocalSession session = create(existingApp, existingApplicationId, activeSessionId, internalRedeploy, timeoutBudget);
- // Note: Needs to be kept in sync with calls in SessionPreparer.writeStateToZooKeeper()
- session.setApplicationId(existingApplicationId);
- if (existingSession.getApplicationPackageReference() != null) {
- session.setApplicationPackageReference(existingSession.getApplicationPackageReference());
- }
- session.setVespaVersion(existingSession.getVespaVersion());
- session.setDockerImageRepository(existingSession.getDockerImageRepository());
- session.setAthenzDomain(existingSession.getAthenzDomain());
- return session;
- }
-
- private LocalSession create(File applicationFile, ApplicationId applicationId, Optional<Long> currentlyActiveSessionId,
- boolean internalRedeploy, TimeoutBudget timeoutBudget) {
- long sessionId = getNextSessionId();
- try {
- ensureSessionPathDoesNotExist(sessionId);
- ApplicationPackage app = createApplicationPackage(applicationFile, applicationId,
- sessionId, currentlyActiveSessionId, internalRedeploy);
- return createSessionFromApplication(app, sessionId, timeoutBudget, clock);
- } catch (Exception e) {
- throw new RuntimeException("Error creating session " + sessionId, e);
- }
- }
-
- /**
- * This method is used when creating a session based on a remote session and the distributed application package
- * It does not wait for session being created on other servers
- */
- private LocalSession createLocalSession(File applicationFile, ApplicationId applicationId, long sessionId) {
- try {
- Optional<Long> currentlyActiveSessionId = getActiveSessionId(applicationId);
- ApplicationPackage applicationPackage = createApplicationPackage(applicationFile, applicationId,
- sessionId, currentlyActiveSessionId, false);
- return createLocalSession(sessionId, applicationPackage);
- } catch (Exception e) {
- throw new RuntimeException("Error creating session " + sessionId, e);
- }
- }
private ApplicationPackage createApplicationPackage(File applicationFile, ApplicationId applicationId,
long sessionId, Optional<Long> currentlyActiveSessionId,
@@ -745,12 +711,51 @@ public class SessionRepository {
public Clock clock() { return clock; }
+ public void close() {
+ deleteAllSessions();
+ tenantFileSystemDirs.delete();
+ try {
+ if (directoryCache != null) {
+ directoryCache.close();
+ }
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Exception when closing path cache", e);
+ } finally {
+ checkForRemovedSessions(new ArrayList<>());
+ }
+ }
+
public Transaction createActivateTransaction(Session session) {
Transaction transaction = createSetStatusTransaction(session, Session.Status.ACTIVATE);
transaction.add(applicationRepo.createPutTransaction(session.getApplicationId(), session.getSessionId()).operations());
return transaction;
}
+ private synchronized void sessionsChanged() throws NumberFormatException {
+ List<Long> sessions = getSessionListFromDirectoryCache(directoryCache.getCurrentData());
+ checkForRemovedSessions(sessions);
+ checkForAddedSessions(sessions);
+ }
+
+ private void checkForRemovedSessions(List<Long> sessions) {
+ for (RemoteSession session : remoteSessionCache.values())
+ if ( ! sessions.contains(session.getSessionId()))
+ sessionRemoved(session.getSessionId());
+ }
+
+ private void checkForAddedSessions(List<Long> sessions) {
+ for (Long sessionId : sessions)
+ if (remoteSessionCache.get(sessionId) == null)
+ sessionAdded(sessionId);
+ }
+
+ private void deleteAllSessions() {
+ List<LocalSession> sessions = new ArrayList<>(localSessionCache.values());
+ for (LocalSession session : sessions) {
+ deleteLocalSession(session);
+ }
+ }
+
private Transaction createSetStatusTransaction(Session session, Session.Status status) {
return session.sessionZooKeeperClient.createWriteStatusTransaction(status);
}