diff options
author | HÃ¥kon Hallingstad <hakon.hallingstad@gmail.com> | 2022-07-11 12:39:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-11 12:39:44 +0200 |
commit | fddbd1c759821594990c3b67abd1a82d40463d49 (patch) | |
tree | 39f3781f06e010d7c03c4bbfda3dae5bc38ed6cd | |
parent | 54281752ea492a263b865192784a47adeab67718 (diff) | |
parent | c1e1c83ccfc68fb73facbba975877d4b58ac5cc6 (diff) |
Merge pull request #23446 from vespa-engine/hmusum/use-session-id
Use session id instead of Session in SessionStateWatcher [run-systemtest]
2 files changed, 46 insertions, 55 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java index d803488cb0a..ecfee6f2b70 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionRepository.java @@ -344,10 +344,10 @@ public class SessionRepository { public RemoteSession createRemoteSession(long sessionId) { SessionZooKeeperClient sessionZKClient = createSessionZooKeeperClient(sessionId); RemoteSession session = new RemoteSession(tenantName, sessionId, sessionZKClient); - RemoteSession newSession = loadSessionIfActive(session).orElse(session); - remoteSessionCache.put(sessionId, newSession); - updateSessionStateWatcher(sessionId, newSession); - return newSession; + loadSessionIfActive(session); + remoteSessionCache.put(sessionId, session); + updateSessionStateWatcher(sessionId); + return session; } public int deleteExpiredRemoteSessions(Clock clock, Duration expiryTime) { @@ -375,9 +375,11 @@ public class SessionRepository { return deleted; } - public void deactivateAndUpdateCache(RemoteSession remoteSession) { - RemoteSession session = remoteSession.deactivated(); - remoteSessionCache.put(session.getSessionId(), session); + public void deactivateAndUpdateCache(long sessionId) { + var s = remoteSessionCache.get(sessionId); + if (s == null) return; + + remoteSessionCache.put(sessionId, s.deactivated()); } public void deleteRemoteSessionFromZooKeeper(Session session) { @@ -388,7 +390,7 @@ public class SessionRepository { } private boolean sessionHasExpired(Instant created, Duration expiryTime, Clock clock) { - return (created.plus(expiryTime).isBefore(clock.instant())); + return created.plus(expiryTime).isBefore(clock.instant()); } private List<Long> getSessionListFromDirectoryCache(List<ChildData> children) { @@ -439,8 +441,11 @@ public class SessionRepository { return session.getStatus() == Session.Status.DELETE; } - void activate(RemoteSession session) { - long sessionId = session.getSessionId(); + void activate(long sessionId) { + createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; + CompletionWaiter waiter = createSessionZooKeeperClient(sessionId).getActiveWaiter(); log.log(Level.FINE, () -> session.logPre() + "Activating " + sessionId); applicationRepo.activateApplication(ensureApplicationLoaded(session), sessionId); @@ -449,21 +454,25 @@ public class SessionRepository { log.log(Level.INFO, session.logPre() + "Session activated: " + sessionId); } - private Optional<RemoteSession> loadSessionIfActive(RemoteSession session) { + private void loadSessionIfActive(RemoteSession session) { for (ApplicationId applicationId : applicationRepo.activeApplications()) { Optional<Long> activeSession = applicationRepo.activeSessionOf(applicationId); if (activeSession.isPresent() && activeSession.get() == session.getSessionId()) { log.log(Level.FINE, () -> "Found active application for session " + session.getSessionId() + " , loading it"); applicationRepo.activateApplication(ensureApplicationLoaded(session), session.getSessionId()); log.log(Level.INFO, session.logPre() + "Application activated successfully: " + applicationId + " (generation " + session.getSessionId() + ")"); - return Optional.ofNullable(remoteSessionCache.get(session.getSessionId())); + return; } } - return Optional.empty(); } - void prepareRemoteSession(RemoteSession session) { - SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(session.getSessionId()); + void prepareRemoteSession(long sessionId) { + // Might need to create local session first + createLocalSessionFromDistributedApplicationPackage(sessionId); + RemoteSession session = remoteSessionCache.get(sessionId); + if (session == null) return; + + SessionZooKeeperClient sessionZooKeeperClient = createSessionZooKeeperClient(sessionId); CompletionWaiter waiter = sessionZooKeeperClient.getPrepareWaiter(); ensureApplicationLoaded(session); notifyCompletion(waiter); @@ -480,7 +489,7 @@ public class SessionRepository { RemoteSession activated = session.activated(applicationSet); long sessionId = activated.getSessionId(); remoteSessionCache.put(sessionId, activated); - updateSessionStateWatcher(sessionId, activated); + updateSessionStateWatcher(sessionId); return applicationSet; } @@ -817,8 +826,9 @@ public class SessionRepository { } /** - * Returns a new local session for the given session id if it does not already exist. - * Will also add the session to the local session cache if necessary + * Create a new local session for the given session id if it does not already exist. + * Will also add the session to the local session cache if necessary. If there is no + * remote session matching the session it will also be created. */ public void createLocalSessionFromDistributedApplicationPackage(long sessionId) { if (applicationRepo.sessionExistsInFileSystem(sessionId)) { @@ -886,15 +896,12 @@ public class SessionRepository { return new TenantFileSystemDirs(configServerDB, tenantName).getUserApplicationDir(sessionId); } - private void updateSessionStateWatcher(long sessionId, RemoteSession remoteSession) { - SessionStateWatcher sessionStateWatcher = sessionStateWatchers.get(sessionId); - if (sessionStateWatcher == null) { - Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(sessionId).getAbsolute(), false); + private void updateSessionStateWatcher(long sessionId) { + sessionStateWatchers.computeIfAbsent(sessionId, (id) -> { + Curator.FileCache fileCache = curator.createFileCache(getSessionStatePath(id).getAbsolute(), false); fileCache.addListener(this::nodeChanged); - sessionStateWatchers.put(sessionId, new SessionStateWatcher(fileCache, remoteSession, metricUpdater, zkWatcherExecutor, this)); - } else { - sessionStateWatcher.updateRemoteSession(remoteSession); - } + return new SessionStateWatcher(fileCache, id, metricUpdater, zkWatcherExecutor, this); + }); } @Override diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java index 6c6f60426fe..abd2266fcf3 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/session/SessionStateWatcher.java @@ -13,7 +13,7 @@ import java.util.logging.Logger; import static com.yahoo.vespa.config.server.session.Session.Status; /** - * Watches one particular session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) + * Watches session state for a session (/config/v2/tenants/<tenantName>/sessions/<n>/sessionState in ZooKeeper) * The session must be in the session repo. * * @author Vegard Havdal @@ -24,18 +24,18 @@ public class SessionStateWatcher { private static final Logger log = Logger.getLogger(SessionStateWatcher.class.getName()); private final Curator.FileCache fileCache; - private volatile RemoteSession session; + private final long sessionId; private final MetricUpdater metrics; private final Executor zkWatcherExecutor; private final SessionRepository sessionRepository; SessionStateWatcher(Curator.FileCache fileCache, - RemoteSession session, + long sessionId, MetricUpdater metrics, Executor zkWatcherExecutor, SessionRepository sessionRepository) { this.fileCache = fileCache; - this.session = session; + this.sessionId = sessionId; this.metrics = metrics; this.fileCache.addListener(this::nodeChanged); this.fileCache.start(); @@ -44,37 +44,27 @@ public class SessionStateWatcher { } private synchronized void sessionStatusChanged(Status newStatus) { - long sessionId = session.getSessionId(); - switch (newStatus) { case NEW: case UNKNOWN: break; case DELETE: - sessionRepository.deactivateAndUpdateCache(session); + case DEACTIVATE: + sessionRepository.deactivateAndUpdateCache(sessionId); break; case PREPARE: - createLocalSession(sessionId); - sessionRepository.prepareRemoteSession(session); + sessionRepository.prepareRemoteSession(sessionId); break; case ACTIVATE: - createLocalSession(sessionId); - sessionRepository.activate(session); - break; - case DEACTIVATE: - sessionRepository.deactivateAndUpdateCache(session); + sessionRepository.activate(sessionId); break; default: throw new IllegalStateException("Unknown status " + newStatus); } } - private void createLocalSession(long sessionId) { - sessionRepository.createLocalSessionFromDistributedApplicationPackage(sessionId); - } - public long getSessionId() { - return session.getSessionId(); + return sessionId; } public void close() { @@ -93,10 +83,9 @@ public class SessionStateWatcher { if (node != null) { newStatus = Status.parse(Utf8.toString(node.getData())); - String debugMessage = log.isLoggable(Level.FINE) ? - session.logPre() + "Session " + session.getSessionId() - + " changed status to " + newStatus.name() : - null; + String debugMessage = log.isLoggable(Level.FINE) + ? "Session " + sessionId + " changed status to " + newStatus.name() + : null; if (debugMessage != null) log.fine(debugMessage); sessionStatusChanged(newStatus); @@ -104,15 +93,10 @@ public class SessionStateWatcher { if (debugMessage != null) log.fine(debugMessage + ": Done"); } } catch (Exception e) { - log.log(Level.WARNING, session.logPre() + "Error handling session change to " + - newStatus.name() + " for session " + getSessionId(), e); + log.log(Level.WARNING, "Error handling session change to " + newStatus.name() + " for session " + getSessionId(), e); metrics.incSessionChangeErrors(); } }); } - public synchronized void updateRemoteSession(RemoteSession session) { - this.session = session; - } - } |