diff options
author | Harald Musum <musum@verizonmedia.com> | 2020-09-09 13:50:00 +0200 |
---|---|---|
committer | Harald Musum <musum@verizonmedia.com> | 2020-09-09 13:50:00 +0200 |
commit | b5941bdd129e6291de712768f1e53372f40a2a6b (patch) | |
tree | f8b1a0828c3739fa5f80d5caa17953c09120a1c5 | |
parent | 703a69752f6e125c336b11ad3bba455e143beed6 (diff) |
Clean up code for adding and deleting remote sessions
3 files changed, 63 insertions, 43 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSession.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSession.java index f551fcb6005..78a37499cef 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSession.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/RemoteSession.java @@ -25,7 +25,7 @@ public class RemoteSession extends Session { * @param sessionId The session id for this session. * @param zooKeeperClient a SessionZooKeeperClient instance */ - public RemoteSession(TenantName tenant, long sessionId, SessionZooKeeperClient zooKeeperClient) { + RemoteSession(TenantName tenant, long sessionId, SessionZooKeeperClient zooKeeperClient) { this(tenant, sessionId, zooKeeperClient, Optional.empty()); } @@ -35,11 +35,12 @@ public class RemoteSession extends Session { * @param tenant The name of the tenant creating session * @param sessionId The session id for this session. * @param zooKeeperClient a SessionZooKeeperClient instance + * @param applicationSet current application set for this session */ - private RemoteSession(TenantName tenant, - long sessionId, - SessionZooKeeperClient zooKeeperClient, - Optional<ApplicationSet> applicationSet) { + RemoteSession(TenantName tenant, + long sessionId, + SessionZooKeeperClient zooKeeperClient, + Optional<ApplicationSet> applicationSet) { super(tenant, sessionId, zooKeeperClient); this.applicationSet = applicationSet; } @@ -61,4 +62,9 @@ public class RemoteSession extends Session { return sessionZooKeeperClient.createWriteStatusTransaction(Status.DELETE); } + @Override + public String toString() { + return super.toString() + ",application set=" + applicationSet; + } + } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index de6314ca3b8..b251e2c6564 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -127,9 +127,7 @@ public class SessionRepository { public synchronized void addLocalSession(LocalSession session) { localSessionCache.putSession(session); - long sessionId = session.getSessionId(); - RemoteSession remoteSession = createRemoteSession(sessionId); - addSessionStateWatcher(remoteSession); + createRemoteSession(session.getSessionId()); } public LocalSession getLocalSession(long sessionId) { @@ -267,11 +265,6 @@ public class SessionRepository { return getSessionList(curator.getChildren(sessionsPath)); } - public void addRemoteSession(RemoteSession session) { - remoteSessionCache.putSession(session); - metrics.incAddedSessions(); - } - public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { int deleted = 0; for (long sessionId : getRemoteSessions()) { @@ -280,7 +273,7 @@ public class SessionRepository { if (session.getStatus() == Session.Status.ACTIVATE) continue; if (sessionHasExpired(session.getCreateTime(), expiryTime, clock)) { log.log(Level.FINE, () -> "Remote session " + sessionId + " for " + tenantName + " has expired, deleting it"); - deleteSession(session); + deleteRemoteSession(session); deleted++; } } @@ -288,10 +281,12 @@ public class SessionRepository { } public void deactivate(RemoteSession remoteSession) { - remoteSessionCache.putSession(remoteSession.deactivated()); + RemoteSession session = remoteSession.deactivated(); + remoteSessionCache.putSession(session); + updateSessionStateWatcher(session); } - public void deleteSession(RemoteSession session) { + public void deleteRemoteSession(RemoteSession session) { SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); Transaction transaction = sessionZooKeeperClient.deleteTransaction(); transaction.commit(); @@ -363,10 +358,8 @@ public class SessionRepository { log.log(Level.FINE, () -> "Adding remote session to SessionRepository: " + sessionId); RemoteSession remoteSession = createRemoteSession(sessionId); loadSessionIfActive(remoteSession); - addRemoteSession(remoteSession); if (distributeApplicationPackage()) createLocalSessionUsingDistributedApplicationPackage(sessionId); - addSessionStateWatcher(remoteSession); } void activate(RemoteSession session) { @@ -381,8 +374,10 @@ public class SessionRepository { log.log(Level.INFO, session.logPre() + "Session activated: " + sessionId); } - void deleteSession(RemoteSession remoteSession, Optional<LocalSession> localSession) { - localSession.ifPresent(this::deleteLocalSession); + void deleteSession(RemoteSession remoteSession) { + LocalSession localSession = getLocalSession(remoteSession.getSessionId()); + if (localSession != null) + deleteLocalSession(localSession); deactivate(remoteSession); } @@ -422,7 +417,10 @@ public class SessionRepository { } ApplicationSet applicationSet = loadApplication(session); - remoteSessionCache.putSession(session.activated(applicationSet)); + RemoteSession activated = session.activated(applicationSet); + remoteSessionCache.putSession(activated); + updateSessionStateWatcher(activated); + return applicationSet; } } @@ -460,6 +458,7 @@ public class SessionRepository { } private ApplicationSet loadApplication(RemoteSession session) { + log.log(Level.FINE, () -> "Loading application for " + session); SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); ApplicationPackage applicationPackage = sessionZooKeeperClient.loadApplicationPackage(); ActivatedModelsBuilder builder = new ActivatedModelsBuilder(session.getTenantName(), @@ -529,8 +528,16 @@ public class SessionRepository { } public RemoteSession createRemoteSession(long sessionId) { + return createRemoteSession(sessionId, Optional.empty()); + } + + public RemoteSession createRemoteSession(long sessionId, Optional<ApplicationSet> applicationSet) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); - return new RemoteSession(tenantName, sessionId, sessionZKClient); + RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient, applicationSet); + remoteSessionCache.putSession(session); + updateSessionStateWatcher(session); + metrics.incAddedSessions(); + return session; } private void ensureSessionPathDoesNotExist(long sessionId) { @@ -678,10 +685,11 @@ public class SessionRepository { * Returns a new local session for the given session id if it does not already exist. * Will also add the session to the local session cache if necessary */ - public Optional<LocalSession> createLocalSessionUsingDistributedApplicationPackage(long sessionId) { + public void createLocalSessionUsingDistributedApplicationPackage(long sessionId) { if (applicationRepo.hasLocalSession(sessionId)) { log.log(Level.FINE, () -> "Local session for session id " + sessionId + " already exists"); - return Optional.of(createSessionFromId(sessionId)); + createSessionFromId(sessionId); + return; } SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); @@ -697,16 +705,14 @@ public class SessionRepository { // We cannot be guaranteed that the file reference exists (it could be that it has not // been downloaded yet), and e.g when bootstrapping we cannot throw an exception in that case log.log(Level.FINE, () -> "File reference for session id " + sessionId + ": " + fileReference + " not found in " + fileDirectory); - return Optional.empty(); + return; } ApplicationId applicationId = sessionZKClient.readApplicationId() .orElseThrow(() -> new RuntimeException("Could not find application id for session " + sessionId)); log.log(Level.FINE, () -> "Creating local session for tenant '" + tenantName + "' with session id " + sessionId); LocalSession localSession = createLocalSession(sessionDir, applicationId, sessionId); addLocalSession(localSession); - return Optional.of(localSession); } - return Optional.empty(); } private Optional<Long> getActiveSessionId(ApplicationId applicationId) { @@ -745,12 +751,15 @@ public class SessionRepository { return new TenantFileSystemDirs(componentRegistry.getConfigServerDB(), tenantName).getUserApplicationDir(sessionId); } - private void addSessionStateWatcher(RemoteSession remoteSession) { - long sessionId = remoteSession.getSessionId(); - if ( ! sessionStateWatchers.containsKey(sessionId)) { + private void updateSessionStateWatcher(RemoteSession session) { + long sessionId = session.getSessionId(); + SessionStateWatcher sessionStateWatcher = sessionStateWatchers.get(sessionId); + if (sessionStateWatcher == null) { Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(sessionId).getAbsolute(), false); fileCache.addListener(this::nodeChanged); - sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, remoteSession, metrics, zkWatcherExecutor, this)); + sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, session, metrics, zkWatcherExecutor, this)); + } else { + sessionStateWatcher.setSession(session); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java index d5393679a84..2fe0f9d6cbc 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java @@ -24,18 +24,18 @@ public class SessionStateWatcher { private static final Logger log = Logger.getLogger(SessionStateWatcher.class.getName()); private final Curator.FileCache fileCache; - private final RemoteSession remoteSession; + private RemoteSession session; private final MetricUpdater metrics; private final Executor zkWatcherExecutor; private final SessionRepository sessionRepository; SessionStateWatcher(Curator.FileCache fileCache, - RemoteSession remoteSession, + RemoteSession session, MetricUpdater metrics, Executor zkWatcherExecutor, SessionRepository sessionRepository) { this.fileCache = fileCache; - this.remoteSession = remoteSession; + this.session = session; this.metrics = metrics; this.fileCache.addListener(this::nodeChanged); this.fileCache.start(); @@ -44,24 +44,24 @@ public class SessionStateWatcher { } private void sessionStatusChanged(Status newStatus) { - long sessionId = remoteSession.getSessionId(); + long sessionId = session.getSessionId(); switch (newStatus) { case NEW: case NONE: break; case PREPARE: createLocalSession(sessionId); - sessionRepository.prepareRemoteSession(remoteSession); + sessionRepository.prepareRemoteSession(session); break; case ACTIVATE: createLocalSession(sessionId); - sessionRepository.activate(remoteSession); + sessionRepository.activate(session); break; case DEACTIVATE: - sessionRepository.deactivate(remoteSession); + sessionRepository.deactivate(session); break; case DELETE: - sessionRepository.deleteSession(remoteSession); + sessionRepository.deleteSession(session); break; default: throw new IllegalStateException("Unknown status " + newStatus); @@ -75,7 +75,7 @@ public class SessionStateWatcher { } public long getSessionId() { - return remoteSession.getSessionId(); + return session.getSessionId(); } public void close() { @@ -93,16 +93,21 @@ public class SessionStateWatcher { ChildData node = fileCache.getCurrentData(); if (node != null) { newStatus = Status.parse(Utf8.toString(node.getData())); - log.log(Level.FINE, remoteSession.logPre() + "Session change: Session " - + remoteSession.getSessionId() + " changed status to " + newStatus.name()); + final String statusName = newStatus.name(); + log.log(Level.FINE, () -> session.logPre() + "Session change: Session " + + getSessionId() + " changed status to " + statusName); sessionStatusChanged(newStatus); } } catch (Exception e) { - log.log(Level.WARNING, remoteSession.logPre() + "Error handling session change to " + + log.log(Level.WARNING, session.logPre() + "Error handling session change to " + newStatus.name() + " for session " + getSessionId(), e); metrics.incSessionChangeErrors(); } }); } + void setSession(RemoteSession session) { + this.session = session; + } + } |